Merge pull request #10782 from taosdata/feature/3.0_mhli
Feature/3.0 mhli
This commit is contained in:
commit
c660c9ad35
|
@ -31,10 +31,10 @@ extern "C" {
|
||||||
#define TIMER_MAX_MS 0x7FFFFFFF
|
#define TIMER_MAX_MS 0x7FFFFFFF
|
||||||
#define ENV_TICK_TIMER_MS 1000
|
#define ENV_TICK_TIMER_MS 1000
|
||||||
#define PING_TIMER_MS 1000
|
#define PING_TIMER_MS 1000
|
||||||
#define ELECT_TIMER_MS_MIN 150
|
#define ELECT_TIMER_MS_MIN 1500
|
||||||
#define ELECT_TIMER_MS_MAX 300
|
#define ELECT_TIMER_MS_MAX 3000
|
||||||
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
|
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
|
||||||
#define HEARTBEAT_TIMER_MS 30
|
#define HEARTBEAT_TIMER_MS 300
|
||||||
|
|
||||||
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
|
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,9 @@ void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId);
|
||||||
void raftStoreClearVote(SRaftStore *pRaftStore);
|
void raftStoreClearVote(SRaftStore *pRaftStore);
|
||||||
void raftStoreNextTerm(SRaftStore *pRaftStore);
|
void raftStoreNextTerm(SRaftStore *pRaftStore);
|
||||||
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
|
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
|
||||||
|
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson);
|
||||||
|
cJSON * raftStore2Json(SRaftStore *pRaftStore);
|
||||||
|
char * raftStore2Str(SRaftStore *pRaftStore);
|
||||||
|
|
||||||
// for debug -------------------
|
// for debug -------------------
|
||||||
void raftStorePrint(SRaftStore *pObj);
|
void raftStorePrint(SRaftStore *pObj);
|
||||||
|
|
|
@ -102,7 +102,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
|
||||||
SyncTerm localPreLogTerm = 0;
|
SyncTerm localPreLogTerm = 0;
|
||||||
if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||||
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm);
|
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
|
||||||
assert(pEntry != NULL);
|
assert(pEntry != NULL);
|
||||||
localPreLogTerm = pEntry->term;
|
localPreLogTerm = pEntry->term;
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestory(pEntry);
|
||||||
|
@ -111,9 +111,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
bool logOK =
|
bool logOK =
|
||||||
(pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
|
(pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
|
||||||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
|
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
|
||||||
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogIndex == localPreLogTerm));
|
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
|
||||||
|
|
||||||
// reject
|
// reject request
|
||||||
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
|
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
|
||||||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
|
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
|
||||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
|
||||||
|
@ -134,6 +134,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
// return to follower state
|
// return to follower state
|
||||||
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
|
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
|
||||||
syncNodeBecomeFollower(ths);
|
syncNodeBecomeFollower(ths);
|
||||||
|
|
||||||
|
// need ret?
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
// accept request
|
// accept request
|
||||||
|
@ -144,17 +147,17 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
matchSuccess = true;
|
matchSuccess = true;
|
||||||
}
|
}
|
||||||
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||||
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm);
|
SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
|
||||||
assert(pEntry != NULL);
|
assert(pPreEntry != NULL);
|
||||||
if (pMsg->prevLogTerm == pEntry->term) {
|
if (pMsg->prevLogTerm == pPreEntry->term) {
|
||||||
matchSuccess = true;
|
matchSuccess = true;
|
||||||
}
|
}
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestory(pPreEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (matchSuccess) {
|
if (matchSuccess) {
|
||||||
// delete conflict entries
|
// delete conflict entries
|
||||||
if (ths->pLogStore->getLastIndex(ths->pLogStore) > pMsg->prevLogIndex) {
|
if (pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||||
SyncIndex fromIndex = pMsg->prevLogIndex + 1;
|
SyncIndex fromIndex = pMsg->prevLogIndex + 1;
|
||||||
ths->pLogStore->truncate(ths->pLogStore, fromIndex);
|
ths->pLogStore->truncate(ths->pLogStore, fromIndex);
|
||||||
}
|
}
|
||||||
|
@ -178,6 +181,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
|
|
||||||
syncAppendEntriesReplyDestroy(pReply);
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
|
||||||
pReply->srcId = ths->myRaftId;
|
pReply->srcId = ths->myRaftId;
|
||||||
|
|
|
@ -51,10 +51,10 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
|
||||||
assert(pMsg->term == ths->pRaftStore->currentTerm);
|
assert(pMsg->term == ths->pRaftStore->currentTerm);
|
||||||
|
|
||||||
if (pMsg->success) {
|
if (pMsg->success) {
|
||||||
// nextIndex = reply.matchIndex + 1
|
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
|
||||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
|
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
|
||||||
|
|
||||||
// matchIndex = reply.matchIndex
|
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
|
||||||
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
|
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
|
||||||
|
|
||||||
// maybe commit
|
// maybe commit
|
||||||
|
@ -62,6 +62,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
|
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
|
||||||
|
|
||||||
|
// notice! int64, uint64
|
||||||
if (nextIndex > SYNC_INDEX_BEGIN) {
|
if (nextIndex > SYNC_INDEX_BEGIN) {
|
||||||
--nextIndex;
|
--nextIndex;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "syncIndexMgr.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
|
||||||
|
// \* Leader i advances its commitIndex.
|
||||||
|
// \* This is done as a separate step from handling AppendEntries responses,
|
||||||
|
// \* in part to minimize atomic regions, and in part so that leaders of
|
||||||
|
// \* single-server clusters are able to mark entries committed.
|
||||||
|
// AdvanceCommitIndex(i) ==
|
||||||
|
// /\ state[i] = Leader
|
||||||
|
// /\ LET \* The set of servers that agree up through index.
|
||||||
|
// Agree(index) == {i} \cup {k \in Server :
|
||||||
|
// matchIndex[i][k] >= index}
|
||||||
|
// \* The maximum indexes for which a quorum agrees
|
||||||
|
// agreeIndexes == {index \in 1..Len(log[i]) :
|
||||||
|
// Agree(index) \in Quorum}
|
||||||
|
// \* New value for commitIndex'[i]
|
||||||
|
// newCommitIndex ==
|
||||||
|
// IF /\ agreeIndexes /= {}
|
||||||
|
// /\ log[i][Max(agreeIndexes)].term = currentTerm[i]
|
||||||
|
// THEN
|
||||||
|
// Max(agreeIndexes)
|
||||||
|
// ELSE
|
||||||
|
// commitIndex[i]
|
||||||
|
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
|
||||||
|
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
|
||||||
|
//
|
||||||
|
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
|
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
|
||||||
|
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
|
||||||
|
}
|
|
@ -50,6 +50,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||||
|
int32_t ret = 0;
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
syncNodeFollower2Candidate(pSyncNode);
|
syncNodeFollower2Candidate(pSyncNode);
|
||||||
}
|
}
|
||||||
|
@ -62,7 +63,15 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||||
votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->pRaftStore->currentTerm);
|
votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->pRaftStore->currentTerm);
|
||||||
|
|
||||||
syncNodeVoteForSelf(pSyncNode);
|
syncNodeVoteForSelf(pSyncNode);
|
||||||
int32_t ret = syncNodeRequestVotePeers(pSyncNode);
|
if (voteGrantedMajority(pSyncNode->pVotesGranted)) {
|
||||||
|
// only myself, to leader
|
||||||
|
assert(!pSyncNode->pVotesGranted->toLeader);
|
||||||
|
syncNodeCandidate2Leader(pSyncNode);
|
||||||
|
pSyncNode->pVotesGranted->toLeader = true;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = syncNodeRequestVotePeers(pSyncNode);
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
syncNodeResetElectTimer(pSyncNode);
|
syncNodeResetElectTimer(pSyncNode);
|
||||||
|
|
||||||
|
|
|
@ -269,6 +269,7 @@ static void *syncIOConsumerFunc(void *param) {
|
||||||
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
|
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
|
||||||
if (io->FpOnSyncPingReply != NULL) {
|
if (io->FpOnSyncPingReply != NULL) {
|
||||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
|
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
|
||||||
syncPingReplyDestroy(pSyncMsg);
|
syncPingReplyDestroy(pSyncMsg);
|
||||||
}
|
}
|
||||||
|
@ -276,6 +277,7 @@ static void *syncIOConsumerFunc(void *param) {
|
||||||
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
|
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
|
||||||
if (io->FpOnSyncClientRequest != NULL) {
|
if (io->FpOnSyncClientRequest != NULL) {
|
||||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg);
|
io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg);
|
||||||
syncClientRequestDestroy(pSyncMsg);
|
syncClientRequestDestroy(pSyncMsg);
|
||||||
}
|
}
|
||||||
|
@ -283,6 +285,7 @@ static void *syncIOConsumerFunc(void *param) {
|
||||||
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
|
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
|
||||||
if (io->FpOnSyncRequestVote != NULL) {
|
if (io->FpOnSyncRequestVote != NULL) {
|
||||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg);
|
io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg);
|
||||||
syncRequestVoteDestroy(pSyncMsg);
|
syncRequestVoteDestroy(pSyncMsg);
|
||||||
}
|
}
|
||||||
|
@ -290,6 +293,7 @@ static void *syncIOConsumerFunc(void *param) {
|
||||||
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
|
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
|
||||||
if (io->FpOnSyncRequestVoteReply != NULL) {
|
if (io->FpOnSyncRequestVoteReply != NULL) {
|
||||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg);
|
io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg);
|
||||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||||
}
|
}
|
||||||
|
@ -297,6 +301,7 @@ static void *syncIOConsumerFunc(void *param) {
|
||||||
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
|
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
|
||||||
if (io->FpOnSyncAppendEntries != NULL) {
|
if (io->FpOnSyncAppendEntries != NULL) {
|
||||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg);
|
io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesDestroy(pSyncMsg);
|
syncAppendEntriesDestroy(pSyncMsg);
|
||||||
}
|
}
|
||||||
|
@ -304,6 +309,7 @@ static void *syncIOConsumerFunc(void *param) {
|
||||||
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
|
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
|
||||||
if (io->FpOnSyncAppendEntriesReply != NULL) {
|
if (io->FpOnSyncAppendEntriesReply != NULL) {
|
||||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg);
|
io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
}
|
}
|
||||||
|
@ -311,6 +317,7 @@ static void *syncIOConsumerFunc(void *param) {
|
||||||
} else if (pRpcMsg->msgType == SYNC_TIMEOUT) {
|
} else if (pRpcMsg->msgType == SYNC_TIMEOUT) {
|
||||||
if (io->FpOnSyncTimeout != NULL) {
|
if (io->FpOnSyncTimeout != NULL) {
|
||||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg);
|
io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg);
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
cJSON *pRoot = cJSON_CreateObject();
|
cJSON *pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pSyncIndexMgr != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum);
|
cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum);
|
||||||
cJSON *pReplicas = cJSON_CreateArray();
|
cJSON *pReplicas = cJSON_CreateArray();
|
||||||
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
|
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
|
||||||
|
@ -86,6 +87,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
|
||||||
cJSON_AddItemToObject(pRoot, "index", pIndex);
|
cJSON_AddItemToObject(pRoot, "index", pIndex);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode);
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode);
|
||||||
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
|
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON *pJson = cJSON_CreateObject();
|
cJSON *pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot);
|
cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot);
|
||||||
|
|
|
@ -103,6 +103,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
||||||
assert(pSyncNode != NULL);
|
assert(pSyncNode != NULL);
|
||||||
memset(pSyncNode, 0, sizeof(SSyncNode));
|
memset(pSyncNode, 0, sizeof(SSyncNode));
|
||||||
|
|
||||||
|
if (taosMkDir(pSyncInfo->path) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
// init by SSyncInfo
|
// init by SSyncInfo
|
||||||
pSyncNode->vgId = pSyncInfo->vgId;
|
pSyncNode->vgId = pSyncInfo->vgId;
|
||||||
pSyncNode->syncCfg = pSyncInfo->syncCfg;
|
pSyncNode->syncCfg = pSyncInfo->syncCfg;
|
||||||
|
@ -200,6 +206,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
||||||
pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
|
pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
|
||||||
pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
|
pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
|
||||||
|
|
||||||
|
// start raft
|
||||||
|
syncNodeBecomeFollower(pSyncNode);
|
||||||
|
|
||||||
return pSyncNode;
|
return pSyncNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -355,6 +364,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pSyncNode != NULL) {
|
||||||
// init by SSyncInfo
|
// init by SSyncInfo
|
||||||
cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
|
cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
|
||||||
cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
|
cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
|
||||||
|
@ -477,6 +487,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
|
||||||
cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
|
cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
|
||||||
cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
|
cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
|
cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
|
||||||
|
@ -500,15 +511,17 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
|
void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
|
||||||
|
// maybe clear leader cache
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
pSyncNode->leaderCache = EMPTY_RAFT_ID;
|
pSyncNode->leaderCache = EMPTY_RAFT_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// state change
|
||||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||||
|
|
||||||
int32_t electMS = syncUtilElectRandomMS();
|
// reset elect timer
|
||||||
syncNodeRestartElectTimer(pSyncNode, electMS);
|
syncNodeResetElectTimer(pSyncNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TLA+ Spec
|
// TLA+ Spec
|
||||||
|
@ -530,20 +543,32 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
|
||||||
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
|
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
|
||||||
//
|
//
|
||||||
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
|
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
|
||||||
|
// state change
|
||||||
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
||||||
|
|
||||||
|
// set leader cache
|
||||||
pSyncNode->leaderCache = pSyncNode->myRaftId;
|
pSyncNode->leaderCache = pSyncNode->myRaftId;
|
||||||
|
|
||||||
for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
|
for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
|
||||||
|
// maybe overwrite myself, no harm
|
||||||
|
// just do it!
|
||||||
pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
|
pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
|
for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
|
||||||
|
// maybe overwrite myself, no harm
|
||||||
|
// just do it!
|
||||||
pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
|
pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stop elect timer
|
||||||
syncNodeStopElectTimer(pSyncNode);
|
syncNodeStopElectTimer(pSyncNode);
|
||||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
|
||||||
|
// start replicate right now!
|
||||||
syncNodeReplicate(pSyncNode);
|
syncNodeReplicate(pSyncNode);
|
||||||
|
|
||||||
|
// start heartbeat timer
|
||||||
|
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
||||||
|
@ -568,6 +593,9 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// raft vote --------------
|
// raft vote --------------
|
||||||
|
|
||||||
|
// just called by syncNodeVoteForSelf
|
||||||
|
// need assert
|
||||||
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
|
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
|
||||||
assert(term == pSyncNode->pRaftStore->currentTerm);
|
assert(term == pSyncNode->pRaftStore->currentTerm);
|
||||||
assert(!raftStoreHasVoted(pSyncNode->pRaftStore));
|
assert(!raftStoreHasVoted(pSyncNode->pRaftStore));
|
||||||
|
@ -575,6 +603,7 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId)
|
||||||
raftStoreVote(pSyncNode->pRaftStore, pRaftId);
|
raftStoreVote(pSyncNode->pRaftStore, pRaftId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// simulate get vote from outside
|
||||||
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
|
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
|
||||||
syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));
|
syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));
|
||||||
|
|
||||||
|
@ -589,8 +618,6 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
|
||||||
syncRequestVoteReplyDestroy(pMsg);
|
syncRequestVoteReplyDestroy(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {}
|
|
||||||
|
|
||||||
// for debug --------------
|
// for debug --------------
|
||||||
void syncNodePrint(SSyncNode* pObj) {
|
void syncNodePrint(SSyncNode* pObj) {
|
||||||
char* serialized = syncNode2Str(pObj);
|
char* serialized = syncNode2Str(pObj);
|
||||||
|
@ -676,7 +703,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
||||||
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||||
&pSyncNode->pHeartbeatTimer);
|
&pSyncNode->pHeartbeatTimer);
|
||||||
} else {
|
} else {
|
||||||
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64 "",
|
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
|
||||||
|
"",
|
||||||
pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
|
pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -213,16 +213,18 @@ SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
|
cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType);
|
cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->logicClock);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->logicClock);
|
||||||
cJSON_AddStringToObject(pRoot, "logicClock", u64buf);
|
cJSON_AddStringToObject(pRoot, "logicClock", u64buf);
|
||||||
cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS);
|
cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data);
|
snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data);
|
||||||
cJSON_AddStringToObject(pRoot, "data", u64buf);
|
cJSON_AddStringToObject(pRoot, "data", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot);
|
cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot);
|
||||||
|
@ -343,13 +345,14 @@ SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
cJSON* syncPing2Json(const SyncPing* pMsg) {
|
cJSON* syncPing2Json(const SyncPing* pMsg) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
@ -364,7 +367,7 @@ cJSON* syncPing2Json(const SyncPing* pMsg) {
|
||||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
cJSON* pDestId = cJSON_CreateObject();
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pMsg->destId.addr;
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
@ -386,6 +389,7 @@ cJSON* syncPing2Json(const SyncPing* pMsg) {
|
||||||
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
||||||
cJSON_AddStringToObject(pRoot, "data2", s);
|
cJSON_AddStringToObject(pRoot, "data2", s);
|
||||||
free(s);
|
free(s);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
|
cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
|
||||||
|
@ -506,13 +510,14 @@ SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
|
cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
@ -527,7 +532,7 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
|
||||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
cJSON* pDestId = cJSON_CreateObject();
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pMsg->destId.addr;
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
@ -549,6 +554,7 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
|
||||||
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
||||||
cJSON_AddStringToObject(pRoot, "data2", s);
|
cJSON_AddStringToObject(pRoot, "data2", s);
|
||||||
free(s);
|
free(s);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot);
|
cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot);
|
||||||
|
@ -665,12 +671,13 @@ SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) {
|
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType);
|
cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->seqNum);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->seqNum);
|
||||||
cJSON_AddStringToObject(pRoot, "seqNum", u64buf);
|
cJSON_AddStringToObject(pRoot, "seqNum", u64buf);
|
||||||
cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak);
|
cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak);
|
||||||
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||||
|
@ -682,6 +689,7 @@ cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) {
|
||||||
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
||||||
cJSON_AddStringToObject(pRoot, "data2", s);
|
cJSON_AddStringToObject(pRoot, "data2", s);
|
||||||
free(s);
|
free(s);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot);
|
cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot);
|
||||||
|
@ -786,13 +794,14 @@ SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
|
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
@ -820,12 +829,13 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
|
||||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->lastLogIndex);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex);
|
||||||
cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf);
|
cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->lastLogTerm);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm);
|
||||||
cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf);
|
cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SyncRequestVote", pRoot);
|
cJSON_AddItemToObject(pJson, "SyncRequestVote", pRoot);
|
||||||
|
@ -930,13 +940,14 @@ SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
|
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
@ -964,9 +975,10 @@ cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
|
||||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted);
|
cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot);
|
cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot);
|
||||||
|
@ -1073,13 +1085,14 @@ SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
|
cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
@ -1094,7 +1107,7 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
|
||||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
cJSON* pDestId = cJSON_CreateObject();
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pMsg->destId.addr;
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
@ -1108,16 +1121,16 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
|
||||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->prevLogIndex);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex);
|
||||||
cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf);
|
cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf);
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->prevLogTerm);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm);
|
||||||
cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf);
|
cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf);
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->commitIndex);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->commitIndex);
|
||||||
cJSON_AddStringToObject(pRoot, "commit_index", u64buf);
|
cJSON_AddStringToObject(pRoot, "commit_index", u64buf);
|
||||||
|
|
||||||
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||||
|
@ -1128,6 +1141,7 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
|
||||||
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
||||||
cJSON_AddStringToObject(pRoot, "data2", s);
|
cJSON_AddStringToObject(pRoot, "data2", s);
|
||||||
free(s);
|
free(s);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SyncAppendEntries", pRoot);
|
cJSON_AddItemToObject(pJson, "SyncAppendEntries", pRoot);
|
||||||
|
@ -1232,13 +1246,14 @@ SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg
|
||||||
|
|
||||||
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
|
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
@ -1253,7 +1268,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
|
||||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
cJSON* pDestId = cJSON_CreateObject();
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pMsg->destId.addr;
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
@ -1267,11 +1282,12 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
|
||||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
cJSON_AddNumberToObject(pRoot, "success", pMsg->success);
|
cJSON_AddNumberToObject(pRoot, "success", pMsg->success);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->matchIndex);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex);
|
||||||
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
|
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot);
|
cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot);
|
||||||
|
|
|
@ -69,17 +69,18 @@ SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) {
|
||||||
|
|
||||||
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) {
|
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pEntry != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pEntry->bytes);
|
cJSON_AddNumberToObject(pRoot, "bytes", pEntry->bytes);
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pEntry->msgType);
|
cJSON_AddNumberToObject(pRoot, "msgType", pEntry->msgType);
|
||||||
cJSON_AddNumberToObject(pRoot, "originalRpcType", pEntry->originalRpcType);
|
cJSON_AddNumberToObject(pRoot, "originalRpcType", pEntry->originalRpcType);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pEntry->seqNum);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->seqNum);
|
||||||
cJSON_AddStringToObject(pRoot, "seqNum", u64buf);
|
cJSON_AddStringToObject(pRoot, "seqNum", u64buf);
|
||||||
cJSON_AddNumberToObject(pRoot, "isWeak", pEntry->isWeak);
|
cJSON_AddNumberToObject(pRoot, "isWeak", pEntry->isWeak);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pEntry->term);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->term);
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pEntry->index);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->index);
|
||||||
cJSON_AddStringToObject(pRoot, "index", u64buf);
|
cJSON_AddStringToObject(pRoot, "index", u64buf);
|
||||||
cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen);
|
cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen);
|
||||||
|
|
||||||
|
@ -91,6 +92,7 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) {
|
||||||
s = syncUtilprintBin2((char*)(pEntry->data), pEntry->dataLen);
|
s = syncUtilprintBin2((char*)(pEntry->data), pEntry->dataLen);
|
||||||
cJSON_AddStringToObject(pRoot, "data2", s);
|
cJSON_AddStringToObject(pRoot, "data2", s);
|
||||||
free(s);
|
free(s);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SSyncRaftEntry", pRoot);
|
cJSON_AddItemToObject(pJson, "SSyncRaftEntry", pRoot);
|
||||||
|
|
|
@ -65,8 +65,9 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
SSyncRaftEntry* pEntry;
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
|
|
||||||
|
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
|
||||||
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
|
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
|
||||||
walReadWithHandle(pWalHandle, index);
|
walReadWithHandle(pWalHandle, index);
|
||||||
pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
|
pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
|
||||||
|
@ -74,6 +75,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
|
|
||||||
// need to hold, do not new every time!!
|
// need to hold, do not new every time!!
|
||||||
walCloseReadHandle(pWalHandle);
|
walCloseReadHandle(pWalHandle);
|
||||||
|
}
|
||||||
|
|
||||||
return pEntry;
|
return pEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,16 +130,17 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
|
||||||
|
|
||||||
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
|
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
|
|
||||||
SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
|
SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pData != NULL && pData->pWal != NULL) {
|
||||||
snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
|
snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
|
||||||
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
|
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
|
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
|
||||||
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
|
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", logStoreLastIndex(pLogStore));
|
snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
|
||||||
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
|
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", logStoreLastTerm(pLogStore));
|
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
|
||||||
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
|
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
|
||||||
|
|
||||||
cJSON* pEntries = cJSON_CreateArray();
|
cJSON* pEntries = cJSON_CreateArray();
|
||||||
|
@ -147,6 +151,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
|
||||||
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
|
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestory(pEntry);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
|
cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
|
||||||
|
|
|
@ -97,16 +97,32 @@ int32_t raftStorePersist(SRaftStore *pRaftStore) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; }
|
static bool raftStoreFileExist(char *path) {
|
||||||
|
bool b = taosStatFile(path, NULL, NULL) >= 0;
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
|
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
|
||||||
assert(pRaftStore != NULL);
|
assert(pRaftStore != NULL);
|
||||||
|
|
||||||
cJSON *pRoot = cJSON_CreateObject();
|
cJSON *pRoot = cJSON_CreateObject();
|
||||||
cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr);
|
char u64Buf[128];
|
||||||
|
snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->currentTerm);
|
||||||
|
cJSON_AddStringToObject(pRoot, "current_term", u64Buf);
|
||||||
|
|
||||||
|
snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->voteFor.addr);
|
||||||
|
cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf);
|
||||||
|
|
||||||
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
|
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
|
||||||
|
|
||||||
|
uint64_t u64 = pRaftStore->voteFor.addr;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pRoot, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "addr_port", port);
|
||||||
|
|
||||||
char *serialized = cJSON_Print(pRoot);
|
char *serialized = cJSON_Print(pRoot);
|
||||||
int len2 = strlen(serialized);
|
int len2 = strlen(serialized);
|
||||||
assert(len2 < len);
|
assert(len2 < len);
|
||||||
|
@ -125,10 +141,12 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
|
||||||
cJSON *pRoot = cJSON_Parse(buf);
|
cJSON *pRoot = cJSON_Parse(buf);
|
||||||
|
|
||||||
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
|
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
|
||||||
pRaftStore->currentTerm = pCurrentTerm->valueint;
|
assert(cJSON_IsString(pCurrentTerm));
|
||||||
|
sscanf(pCurrentTerm->valuestring, "%lu", &(pRaftStore->currentTerm));
|
||||||
|
|
||||||
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
|
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
|
||||||
pRaftStore->voteFor.addr = pVoteForAddr->valueint;
|
assert(cJSON_IsString(pVoteForAddr));
|
||||||
|
sscanf(pVoteForAddr->valuestring, "%lu", &(pRaftStore->voteFor.addr));
|
||||||
|
|
||||||
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
|
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
|
||||||
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
|
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
|
||||||
|
@ -139,11 +157,10 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
|
||||||
|
|
||||||
bool raftStoreHasVoted(SRaftStore *pRaftStore) {
|
bool raftStoreHasVoted(SRaftStore *pRaftStore) {
|
||||||
bool b = syncUtilEmptyId(&(pRaftStore->voteFor));
|
bool b = syncUtilEmptyId(&(pRaftStore->voteFor));
|
||||||
return b;
|
return (!b);
|
||||||
}
|
}
|
||||||
|
|
||||||
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) {
|
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) {
|
||||||
assert(!raftStoreHasVoted(pRaftStore));
|
|
||||||
assert(!syncUtilEmptyId(pRaftId));
|
assert(!syncUtilEmptyId(pRaftId));
|
||||||
pRaftStore->voteFor = *pRaftId;
|
pRaftStore->voteFor = *pRaftId;
|
||||||
raftStorePersist(pRaftStore);
|
raftStorePersist(pRaftStore);
|
||||||
|
@ -164,30 +181,68 @@ void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) {
|
||||||
raftStorePersist(pRaftStore);
|
raftStorePersist(pRaftStore);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson) { return 0; }
|
||||||
|
|
||||||
|
cJSON *raftStore2Json(SRaftStore *pRaftStore) {
|
||||||
|
char u64buf[128];
|
||||||
|
cJSON *pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pRaftStore != NULL) {
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->currentTerm);
|
||||||
|
cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);
|
||||||
|
|
||||||
|
cJSON *pVoteFor = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->voteFor.addr);
|
||||||
|
cJSON_AddStringToObject(pVoteFor, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pRaftStore->voteFor.addr;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pVoteFor, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pVoteFor, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor);
|
||||||
|
|
||||||
|
int hasVoted = raftStoreHasVoted(pRaftStore);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON *pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SRaftStore", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *raftStore2Str(SRaftStore *pRaftStore) {
|
||||||
|
cJSON *pJson = raftStore2Json(pRaftStore);
|
||||||
|
char * serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
// for debug -------------------
|
// for debug -------------------
|
||||||
void raftStorePrint(SRaftStore *pObj) {
|
void raftStorePrint(SRaftStore *pObj) {
|
||||||
char serialized[RAFT_STORE_BLOCK_SIZE];
|
char *serialized = raftStore2Str(pObj);
|
||||||
raftStoreSerialize(pObj, serialized, sizeof(serialized));
|
|
||||||
printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
|
printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||||
fflush(NULL);
|
fflush(NULL);
|
||||||
|
free(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void raftStorePrint2(char *s, SRaftStore *pObj) {
|
void raftStorePrint2(char *s, SRaftStore *pObj) {
|
||||||
char serialized[RAFT_STORE_BLOCK_SIZE];
|
char *serialized = raftStore2Str(pObj);
|
||||||
raftStoreSerialize(pObj, serialized, sizeof(serialized));
|
|
||||||
printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||||
fflush(NULL);
|
fflush(NULL);
|
||||||
|
free(serialized);
|
||||||
}
|
}
|
||||||
void raftStoreLog(SRaftStore *pObj) {
|
void raftStoreLog(SRaftStore *pObj) {
|
||||||
char serialized[RAFT_STORE_BLOCK_SIZE];
|
char *serialized = raftStore2Str(pObj);
|
||||||
raftStoreSerialize(pObj, serialized, sizeof(serialized));
|
|
||||||
sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized);
|
sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized);
|
||||||
fflush(NULL);
|
free(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void raftStoreLog2(char *s, SRaftStore *pObj) {
|
void raftStoreLog2(char *s, SRaftStore *pObj) {
|
||||||
char serialized[RAFT_STORE_BLOCK_SIZE];
|
char *serialized = raftStore2Str(pObj);
|
||||||
raftStoreSerialize(pObj, serialized, sizeof(serialized));
|
|
||||||
sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
fflush(NULL);
|
free(serialized);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "syncMessage.h"
|
#include "syncMessage.h"
|
||||||
#include "syncRaftEntry.h"
|
#include "syncRaftEntry.h"
|
||||||
#include "syncRaftLog.h"
|
#include "syncRaftLog.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
|
|
||||||
// TLA+ Spec
|
// TLA+ Spec
|
||||||
|
@ -51,32 +52,53 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
SRaftId* pDestId = &(pSyncNode->peersId[i]);
|
SRaftId* pDestId = &(pSyncNode->peersId[i]);
|
||||||
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
|
|
||||||
|
|
||||||
|
// set prevLogIndex
|
||||||
|
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
|
||||||
SyncIndex preLogIndex = nextIndex - 1;
|
SyncIndex preLogIndex = nextIndex - 1;
|
||||||
|
|
||||||
|
// set preLogTerm
|
||||||
SyncTerm preLogTerm = 0;
|
SyncTerm preLogTerm = 0;
|
||||||
if (preLogIndex >= SYNC_INDEX_BEGIN) {
|
if (preLogIndex >= SYNC_INDEX_BEGIN) {
|
||||||
SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex);
|
SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex);
|
||||||
|
assert(pPreEntry != NULL);
|
||||||
|
|
||||||
preLogTerm = pPreEntry->term;
|
preLogTerm = pPreEntry->term;
|
||||||
|
syncEntryDestory(pPreEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
|
// batch optimized
|
||||||
assert(nextIndex == lastIndex);
|
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
|
||||||
|
|
||||||
|
SyncAppendEntries* pMsg = NULL;
|
||||||
SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex);
|
SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex);
|
||||||
assert(pEntry != NULL);
|
if (pEntry != NULL) {
|
||||||
|
|
||||||
SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes);
|
SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes);
|
||||||
|
|
||||||
|
// add pEntry into msg
|
||||||
|
uint32_t len;
|
||||||
|
char* serialized = syncEntrySerialize(pEntry, &len);
|
||||||
|
assert(len == pEntry->bytes);
|
||||||
|
memcpy(pMsg->data, serialized, len);
|
||||||
|
|
||||||
|
free(serialized);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// maybe overflow, send empty record
|
||||||
|
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0);
|
||||||
|
}
|
||||||
|
|
||||||
pMsg->srcId = pSyncNode->myRaftId;
|
pMsg->srcId = pSyncNode->myRaftId;
|
||||||
pMsg->destId = *pDestId;
|
pMsg->destId = *pDestId;
|
||||||
|
pMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||||
pMsg->prevLogIndex = preLogIndex;
|
pMsg->prevLogIndex = preLogIndex;
|
||||||
pMsg->prevLogTerm = preLogTerm;
|
pMsg->prevLogTerm = preLogTerm;
|
||||||
pMsg->commitIndex = pSyncNode->commitIndex;
|
pMsg->commitIndex = pSyncNode->commitIndex;
|
||||||
pMsg->dataLen = pEntry->bytes;
|
|
||||||
// add pEntry into msg
|
|
||||||
|
|
||||||
|
// send AppendEntries
|
||||||
syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
|
syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
|
||||||
|
syncAppendEntriesDestroy(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|
|
@ -56,6 +56,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
||||||
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
|
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
|
||||||
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
|
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
|
||||||
if (grant) {
|
if (grant) {
|
||||||
|
// maybe has already voted for pMsg->srcId
|
||||||
|
// vote again, no harm
|
||||||
raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
|
raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(!(pMsg->term > ths->pRaftStore->currentTerm));
|
||||||
// no need this code, because if I receive reply.term, then I must have sent for that term.
|
// no need this code, because if I receive reply.term, then I must have sent for that term.
|
||||||
// if (pMsg->term > ths->pRaftStore->currentTerm) {
|
// if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
// syncNodeUpdateTerm(ths, pMsg->term);
|
// syncNodeUpdateTerm(ths, pMsg->term);
|
||||||
|
@ -52,17 +53,29 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
||||||
|
|
||||||
assert(pMsg->term == ths->pRaftStore->currentTerm);
|
assert(pMsg->term == ths->pRaftStore->currentTerm);
|
||||||
|
|
||||||
|
// This tallies votes even when the current state is not Candidate,
|
||||||
|
// but they won't be looked at, so it doesn't matter.
|
||||||
if (ths->state == TAOS_SYNC_STATE_CANDIDATE) {
|
if (ths->state == TAOS_SYNC_STATE_CANDIDATE) {
|
||||||
votesRespondAdd(ths->pVotesRespond, pMsg);
|
votesRespondAdd(ths->pVotesRespond, pMsg);
|
||||||
if (pMsg->voteGranted) {
|
if (pMsg->voteGranted) {
|
||||||
|
// add vote
|
||||||
voteGrantedVote(ths->pVotesGranted, pMsg);
|
voteGrantedVote(ths->pVotesGranted, pMsg);
|
||||||
|
|
||||||
|
// maybe to leader
|
||||||
if (voteGrantedMajority(ths->pVotesGranted)) {
|
if (voteGrantedMajority(ths->pVotesGranted)) {
|
||||||
if (ths->pVotesGranted->toLeader) {
|
if (!ths->pVotesGranted->toLeader) {
|
||||||
syncNodeCandidate2Leader(ths);
|
syncNodeCandidate2Leader(ths);
|
||||||
|
|
||||||
|
// prevent to leader again!
|
||||||
ths->pVotesGranted->toLeader = true;
|
ths->pVotesGranted->toLeader = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
;
|
||||||
|
// do nothing
|
||||||
|
// UNCHANGED <<votesGranted, voterLog>>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,8 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
|
||||||
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
|
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
|
||||||
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
|
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
|
||||||
++(ths->pingTimerCounter);
|
++(ths->pingTimerCounter);
|
||||||
syncNodePingAll(ths);
|
// syncNodePingAll(ths);
|
||||||
|
syncNodePingPeers(ths);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
|
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
|
||||||
|
|
|
@ -82,6 +82,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
cJSON *pRoot = cJSON_CreateObject();
|
cJSON *pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pVotesGranted != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum);
|
cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum);
|
||||||
cJSON *pReplicas = cJSON_CreateArray();
|
cJSON *pReplicas = cJSON_CreateArray();
|
||||||
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
|
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
|
||||||
|
@ -97,7 +98,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
|
||||||
cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted);
|
cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted);
|
||||||
|
|
||||||
cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes);
|
cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pVotesGranted->term);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pVotesGranted->term);
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum);
|
cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum);
|
||||||
cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader);
|
cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader);
|
||||||
|
@ -106,6 +107,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
|
||||||
|
|
||||||
bool majority = voteGrantedMajority(pVotesGranted);
|
bool majority = voteGrantedMajority(pVotesGranted);
|
||||||
cJSON_AddNumberToObject(pRoot, "majority", majority);
|
cJSON_AddNumberToObject(pRoot, "majority", majority);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON *pJson = cJSON_CreateObject();
|
cJSON *pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot);
|
cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot);
|
||||||
|
@ -122,27 +124,27 @@ char *voteGranted2Str(SVotesGranted *pVotesGranted) {
|
||||||
// for debug -------------------
|
// for debug -------------------
|
||||||
void voteGrantedPrint(SVotesGranted *pObj) {
|
void voteGrantedPrint(SVotesGranted *pObj) {
|
||||||
char *serialized = voteGranted2Str(pObj);
|
char *serialized = voteGranted2Str(pObj);
|
||||||
printf("voteGrantedPrint | len:%zu | %s \n", strlen(serialized), serialized);
|
printf("voteGrantedPrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||||
fflush(NULL);
|
fflush(NULL);
|
||||||
free(serialized);
|
free(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void voteGrantedPrint2(char *s, SVotesGranted *pObj) {
|
void voteGrantedPrint2(char *s, SVotesGranted *pObj) {
|
||||||
char *serialized = voteGranted2Str(pObj);
|
char *serialized = voteGranted2Str(pObj);
|
||||||
printf("voteGrantedPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized);
|
printf("voteGrantedPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||||
fflush(NULL);
|
fflush(NULL);
|
||||||
free(serialized);
|
free(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void voteGrantedLog(SVotesGranted *pObj) {
|
void voteGrantedLog(SVotesGranted *pObj) {
|
||||||
char *serialized = voteGranted2Str(pObj);
|
char *serialized = voteGranted2Str(pObj);
|
||||||
sTrace("voteGrantedLog | len:%zu | %s", strlen(serialized), serialized);
|
sTrace("voteGrantedLog | len:%lu | %s", strlen(serialized), serialized);
|
||||||
free(serialized);
|
free(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void voteGrantedLog2(char *s, SVotesGranted *pObj) {
|
void voteGrantedLog2(char *s, SVotesGranted *pObj) {
|
||||||
char *serialized = voteGranted2Str(pObj);
|
char *serialized = voteGranted2Str(pObj);
|
||||||
sTrace("voteGrantedLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("voteGrantedLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
free(serialized);
|
free(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -203,6 +205,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
|
||||||
char u64buf[128];
|
char u64buf[128];
|
||||||
cJSON *pRoot = cJSON_CreateObject();
|
cJSON *pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pVotesRespond != NULL) {
|
||||||
cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum);
|
cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum);
|
||||||
cJSON *pReplicas = cJSON_CreateArray();
|
cJSON *pReplicas = cJSON_CreateArray();
|
||||||
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
|
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
|
||||||
|
@ -222,10 +225,11 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
|
||||||
cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond);
|
cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond);
|
||||||
cJSON_AddNumberToObject(pRoot, "respondNum", respondNum);
|
cJSON_AddNumberToObject(pRoot, "respondNum", respondNum);
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pVotesRespond->term);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pVotesRespond->term);
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode);
|
snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode);
|
||||||
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
|
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
cJSON *pJson = cJSON_CreateObject();
|
cJSON *pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot);
|
cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot);
|
||||||
|
@ -242,26 +246,26 @@ char *votesRespond2Str(SVotesRespond *pVotesRespond) {
|
||||||
// for debug -------------------
|
// for debug -------------------
|
||||||
void votesRespondPrint(SVotesRespond *pObj) {
|
void votesRespondPrint(SVotesRespond *pObj) {
|
||||||
char *serialized = votesRespond2Str(pObj);
|
char *serialized = votesRespond2Str(pObj);
|
||||||
printf("votesRespondPrint | len:%zu | %s \n", strlen(serialized), serialized);
|
printf("votesRespondPrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||||
fflush(NULL);
|
fflush(NULL);
|
||||||
free(serialized);
|
free(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void votesRespondPrint2(char *s, SVotesRespond *pObj) {
|
void votesRespondPrint2(char *s, SVotesRespond *pObj) {
|
||||||
char *serialized = votesRespond2Str(pObj);
|
char *serialized = votesRespond2Str(pObj);
|
||||||
printf("votesRespondPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized);
|
printf("votesRespondPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||||
fflush(NULL);
|
fflush(NULL);
|
||||||
free(serialized);
|
free(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void votesRespondLog(SVotesRespond *pObj) {
|
void votesRespondLog(SVotesRespond *pObj) {
|
||||||
char *serialized = votesRespond2Str(pObj);
|
char *serialized = votesRespond2Str(pObj);
|
||||||
sTrace("votesRespondLog | len:%zu | %s", strlen(serialized), serialized);
|
sTrace("votesRespondLog | len:%lu | %s", strlen(serialized), serialized);
|
||||||
free(serialized);
|
free(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void votesRespondLog2(char *s, SVotesRespond *pObj) {
|
void votesRespondLog2(char *s, SVotesRespond *pObj) {
|
||||||
char *serialized = votesRespond2Str(pObj);
|
char *serialized = votesRespond2Str(pObj);
|
||||||
sTrace("votesRespondLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("votesRespondLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
free(serialized);
|
free(serialized);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,9 @@ add_executable(syncTimeoutTest "")
|
||||||
add_executable(syncPingTest "")
|
add_executable(syncPingTest "")
|
||||||
add_executable(syncPingReplyTest "")
|
add_executable(syncPingReplyTest "")
|
||||||
add_executable(syncRpcMsgTest "")
|
add_executable(syncRpcMsgTest "")
|
||||||
|
add_executable(syncPingTimerTest2 "")
|
||||||
|
add_executable(syncPingSelfTest "")
|
||||||
|
add_executable(syncElectTest "")
|
||||||
|
|
||||||
|
|
||||||
target_sources(syncTest
|
target_sources(syncTest
|
||||||
|
@ -135,6 +138,18 @@ target_sources(syncRpcMsgTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"syncRpcMsgTest.cpp"
|
"syncRpcMsgTest.cpp"
|
||||||
)
|
)
|
||||||
|
target_sources(syncPingTimerTest2
|
||||||
|
PRIVATE
|
||||||
|
"syncPingTimerTest2.cpp"
|
||||||
|
)
|
||||||
|
target_sources(syncPingSelfTest
|
||||||
|
PRIVATE
|
||||||
|
"syncPingSelfTest.cpp"
|
||||||
|
)
|
||||||
|
target_sources(syncElectTest
|
||||||
|
PRIVATE
|
||||||
|
"syncElectTest.cpp"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
target_include_directories(syncTest
|
target_include_directories(syncTest
|
||||||
|
@ -272,6 +287,26 @@ target_include_directories(syncRpcMsgTest
|
||||||
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
target_include_directories(syncPingTimerTest2
|
||||||
|
PUBLIC
|
||||||
|
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
target_include_directories(syncPingSelfTest
|
||||||
|
PUBLIC
|
||||||
|
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
target_include_directories(syncElectTest
|
||||||
|
PUBLIC
|
||||||
|
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
target_include_directories(syncElectTest
|
||||||
|
PUBLIC
|
||||||
|
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
target_link_libraries(syncTest
|
target_link_libraries(syncTest
|
||||||
|
@ -382,6 +417,18 @@ target_link_libraries(syncRpcMsgTest
|
||||||
sync
|
sync
|
||||||
gtest_main
|
gtest_main
|
||||||
)
|
)
|
||||||
|
target_link_libraries(syncPingTimerTest2
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
|
target_link_libraries(syncPingSelfTest
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
|
target_link_libraries(syncElectTest
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
enable_testing()
|
enable_testing()
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncEnv.h"
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncRaftLog.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
|
||||||
|
int32_t replicaNum = 3;
|
||||||
|
int32_t myIndex = 0;
|
||||||
|
|
||||||
|
SRaftId ids[TSDB_MAX_REPLICA];
|
||||||
|
SSyncInfo syncInfo;
|
||||||
|
SSyncFSM* pFsm;
|
||||||
|
SWal* pWal;
|
||||||
|
SSyncNode* gSyncNode;
|
||||||
|
|
||||||
|
SSyncNode* syncNodeInit() {
|
||||||
|
syncInfo.vgId = 1234;
|
||||||
|
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||||
|
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||||
|
syncInfo.queue = gSyncIO->pMsgQ;
|
||||||
|
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||||
|
syncInfo.pFsm = pFsm;
|
||||||
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "./elect_test_%d", myIndex);
|
||||||
|
|
||||||
|
int code = walInit();
|
||||||
|
assert(code == 0);
|
||||||
|
SWalCfg walCfg;
|
||||||
|
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||||
|
walCfg.vgId = syncInfo.vgId;
|
||||||
|
walCfg.fsyncPeriod = 1000;
|
||||||
|
walCfg.retentionPeriod = 1000;
|
||||||
|
walCfg.rollPeriod = 1000;
|
||||||
|
walCfg.retentionSize = 1000;
|
||||||
|
walCfg.segSize = 1000;
|
||||||
|
walCfg.level = TAOS_WAL_FSYNC;
|
||||||
|
|
||||||
|
char tmpdir[128];
|
||||||
|
snprintf(tmpdir, sizeof(tmpdir), "./elect_test_wal_%d", myIndex);
|
||||||
|
pWal = walOpen(tmpdir, &walCfg);
|
||||||
|
assert(pWal != NULL);
|
||||||
|
|
||||||
|
syncInfo.pWal = pWal;
|
||||||
|
|
||||||
|
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
||||||
|
pCfg->myIndex = myIndex;
|
||||||
|
pCfg->replicaNum = replicaNum;
|
||||||
|
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
pCfg->nodeInfo[i].nodePort = ports[i];
|
||||||
|
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
|
||||||
|
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||||
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
|
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||||
|
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||||
|
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||||
|
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||||
|
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||||
|
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||||
|
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||||
|
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||||
|
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||||
|
gSyncIO->pSyncNode = pSyncNode;
|
||||||
|
|
||||||
|
return pSyncNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* syncInitTest() { return syncNodeInit(); }
|
||||||
|
|
||||||
|
void initRaftId(SSyncNode* pSyncNode) {
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
ids[i] = pSyncNode->replicasId[i];
|
||||||
|
char* s = syncUtilRaftId2Str(&ids[i]);
|
||||||
|
printf("raftId[%d] : %s\n", i, s);
|
||||||
|
free(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = 143 + 64;
|
||||||
|
|
||||||
|
myIndex = 0;
|
||||||
|
if (argc >= 2) {
|
||||||
|
myIndex = atoi(argv[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
ret = syncEnvStart();
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
gSyncNode = syncInitTest();
|
||||||
|
assert(gSyncNode != NULL);
|
||||||
|
syncNodePrint2((char*)"", gSyncNode);
|
||||||
|
|
||||||
|
initRaftId(gSyncNode);
|
||||||
|
|
||||||
|
//---------------------------
|
||||||
|
while (1) {
|
||||||
|
sTrace("while 1 sleep, state: %d, %s", gSyncNode->state, syncUtilState2String(gSyncNode->state));
|
||||||
|
taosMsleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -47,12 +47,11 @@ SSyncNode* syncNodeInit() {
|
||||||
|
|
||||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||||
|
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
|
||||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
|
||||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||||
gSyncIO->pSyncNode = pSyncNode;
|
gSyncIO->pSyncNode = pSyncNode;
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,7 @@ SSyncNode* syncNodeInit() {
|
||||||
SSyncNode* syncInitTest() { return syncNodeInit(); }
|
SSyncNode* syncInitTest() { return syncNodeInit(); }
|
||||||
|
|
||||||
void logStoreTest() {
|
void logStoreTest() {
|
||||||
logStorePrint2((char*)"logStoreTest2", pSyncNode->pLogStore);
|
logStorePrint2((char*)"logStoreTest", pSyncNode->pLogStore);
|
||||||
|
|
||||||
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_INVALID);
|
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_INVALID);
|
||||||
|
|
||||||
|
@ -105,10 +105,10 @@ void logStoreTest() {
|
||||||
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_BEGIN);
|
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_BEGIN);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logStorePrint(pSyncNode->pLogStore);
|
logStorePrint2((char*)"after appendEntry", pSyncNode->pLogStore);
|
||||||
|
|
||||||
pSyncNode->pLogStore->truncate(pSyncNode->pLogStore, 3);
|
pSyncNode->pLogStore->truncate(pSyncNode->pLogStore, 3);
|
||||||
logStorePrint(pSyncNode->pLogStore);
|
logStorePrint2((char*)"after truncate 3", pSyncNode->pLogStore);
|
||||||
}
|
}
|
||||||
|
|
||||||
void initRaftId(SSyncNode* pSyncNode) {
|
void initRaftId(SSyncNode* pSyncNode) {
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncEnv.h"
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
|
||||||
|
int32_t replicaNum = 3;
|
||||||
|
int32_t myIndex = 0;
|
||||||
|
|
||||||
|
SRaftId ids[TSDB_MAX_REPLICA];
|
||||||
|
SSyncInfo syncInfo;
|
||||||
|
SSyncFSM* pFsm;
|
||||||
|
|
||||||
|
SSyncNode* syncNodeInit() {
|
||||||
|
syncInfo.vgId = 1234;
|
||||||
|
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||||
|
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||||
|
syncInfo.queue = gSyncIO->pMsgQ;
|
||||||
|
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||||
|
syncInfo.pFsm = pFsm;
|
||||||
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||||
|
|
||||||
|
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
||||||
|
pCfg->myIndex = myIndex;
|
||||||
|
pCfg->replicaNum = replicaNum;
|
||||||
|
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
pCfg->nodeInfo[i].nodePort = ports[i];
|
||||||
|
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
|
||||||
|
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||||
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
|
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||||
|
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||||
|
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||||
|
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||||
|
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||||
|
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||||
|
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||||
|
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||||
|
gSyncIO->pSyncNode = pSyncNode;
|
||||||
|
|
||||||
|
return pSyncNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* syncInitTest() { return syncNodeInit(); }
|
||||||
|
|
||||||
|
void initRaftId(SSyncNode* pSyncNode) {
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
ids[i] = pSyncNode->replicasId[i];
|
||||||
|
char* s = syncUtilRaftId2Str(&ids[i]);
|
||||||
|
printf("raftId[%d] : %s\n", i, s);
|
||||||
|
free(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = 143 + 64;
|
||||||
|
|
||||||
|
myIndex = 0;
|
||||||
|
if (argc >= 2) {
|
||||||
|
myIndex = atoi(argv[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
ret = syncEnvStart();
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode = syncInitTest();
|
||||||
|
assert(pSyncNode != NULL);
|
||||||
|
syncNodePrint2((char*)"", pSyncNode);
|
||||||
|
|
||||||
|
initRaftId(pSyncNode);
|
||||||
|
|
||||||
|
//---------------------------
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
syncNodePingSelf(pSyncNode);
|
||||||
|
taosMsleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -47,6 +47,7 @@ SSyncNode* syncNodeInit() {
|
||||||
|
|
||||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||||
|
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||||
|
|
|
@ -0,0 +1,106 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncEnv.h"
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
|
||||||
|
int32_t replicaNum = 3;
|
||||||
|
int32_t myIndex = 0;
|
||||||
|
|
||||||
|
SRaftId ids[TSDB_MAX_REPLICA];
|
||||||
|
SSyncInfo syncInfo;
|
||||||
|
SSyncFSM* pFsm;
|
||||||
|
|
||||||
|
SSyncNode* syncNodeInit() {
|
||||||
|
syncInfo.vgId = 1234;
|
||||||
|
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||||
|
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||||
|
syncInfo.queue = gSyncIO->pMsgQ;
|
||||||
|
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||||
|
syncInfo.pFsm = pFsm;
|
||||||
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||||
|
|
||||||
|
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
||||||
|
pCfg->myIndex = myIndex;
|
||||||
|
pCfg->replicaNum = replicaNum;
|
||||||
|
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
pCfg->nodeInfo[i].nodePort = ports[i];
|
||||||
|
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
|
||||||
|
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||||
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
|
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||||
|
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||||
|
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||||
|
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||||
|
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||||
|
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||||
|
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||||
|
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||||
|
gSyncIO->pSyncNode = pSyncNode;
|
||||||
|
|
||||||
|
return pSyncNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* syncInitTest() { return syncNodeInit(); }
|
||||||
|
|
||||||
|
void initRaftId(SSyncNode* pSyncNode) {
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
ids[i] = pSyncNode->replicasId[i];
|
||||||
|
char* s = syncUtilRaftId2Str(&ids[i]);
|
||||||
|
printf("raftId[%d] : %s\n", i, s);
|
||||||
|
free(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = 143 + 64;
|
||||||
|
|
||||||
|
myIndex = 0;
|
||||||
|
if (argc >= 2) {
|
||||||
|
myIndex = atoi(argv[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
ret = syncEnvStart();
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode = syncInitTest();
|
||||||
|
assert(pSyncNode != NULL);
|
||||||
|
syncNodePrint2((char*)"", pSyncNode);
|
||||||
|
|
||||||
|
initRaftId(pSyncNode);
|
||||||
|
|
||||||
|
//---------------------------
|
||||||
|
|
||||||
|
sTrace("syncNodeStartPingTimer ...");
|
||||||
|
ret = syncNodeStartPingTimer(pSyncNode);
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
sTrace("while 1 sleep ...");
|
||||||
|
taosMsleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -3,6 +3,7 @@
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include "syncIO.h"
|
#include "syncIO.h"
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
|
||||||
void logTest() {
|
void logTest() {
|
||||||
sTrace("--- sync log test: trace");
|
sTrace("--- sync log test: trace");
|
||||||
|
@ -13,6 +14,21 @@ void logTest() {
|
||||||
sFatal("--- sync log test: fatal");
|
sFatal("--- sync log test: fatal");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
|
||||||
|
int32_t replicaNum = 5;
|
||||||
|
int32_t myIndex = 0;
|
||||||
|
SRaftId ids[TSDB_MAX_REPLICA];
|
||||||
|
|
||||||
|
void initRaftId() {
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
ids[i].addr = syncUtilAddr2U64("127.0.0.1", ports[i]);
|
||||||
|
ids[i].vgId = 1234;
|
||||||
|
char* s = syncUtilRaftId2Str(&ids[i]);
|
||||||
|
printf("raftId[%d] : %s\n", i, s);
|
||||||
|
free(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int main() {
|
int main() {
|
||||||
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||||
tsAsyncLog = 0;
|
tsAsyncLog = 0;
|
||||||
|
@ -20,23 +36,35 @@ int main() {
|
||||||
|
|
||||||
logTest();
|
logTest();
|
||||||
|
|
||||||
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
|
initRaftId();
|
||||||
|
|
||||||
|
SRaftStore* pRaftStore = raftStoreOpen("./test_raft_store.json");
|
||||||
assert(pRaftStore != NULL);
|
assert(pRaftStore != NULL);
|
||||||
raftStorePrint(pRaftStore);
|
raftStorePrint2((char*)"==raftStoreOpen==", pRaftStore);
|
||||||
|
|
||||||
#if 0
|
raftStoreSetTerm(pRaftStore, 100);
|
||||||
pRaftStore->currentTerm = 100;
|
raftStorePrint2((char*)"==raftStoreSetTerm==", pRaftStore);
|
||||||
pRaftStore->voteFor.addr = 200;
|
|
||||||
pRaftStore->voteFor.vgId = 300;
|
|
||||||
raftStorePersist(pRaftStore);
|
|
||||||
raftStorePrint(pRaftStore);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
++(pRaftStore->currentTerm);
|
raftStoreVote(pRaftStore, &ids[0]);
|
||||||
++(pRaftStore->voteFor.addr);
|
raftStorePrint2((char*)"==raftStoreVote==", pRaftStore);
|
||||||
++(pRaftStore->voteFor.vgId);
|
|
||||||
raftStorePersist(pRaftStore);
|
raftStoreClearVote(pRaftStore);
|
||||||
raftStorePrint(pRaftStore);
|
raftStorePrint2((char*)"==raftStoreClearVote==", pRaftStore);
|
||||||
|
|
||||||
|
raftStoreVote(pRaftStore, &ids[1]);
|
||||||
|
raftStorePrint2((char*)"==raftStoreVote==", pRaftStore);
|
||||||
|
|
||||||
|
raftStoreNextTerm(pRaftStore);
|
||||||
|
raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore);
|
||||||
|
|
||||||
|
raftStoreNextTerm(pRaftStore);
|
||||||
|
raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore);
|
||||||
|
|
||||||
|
raftStoreNextTerm(pRaftStore);
|
||||||
|
raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore);
|
||||||
|
|
||||||
|
raftStoreNextTerm(pRaftStore);
|
||||||
|
raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue