diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 40ff79287b..66c7c8620d 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -31,10 +31,10 @@ extern "C" { #define TIMER_MAX_MS 0x7FFFFFFF #define ENV_TICK_TIMER_MS 1000 #define PING_TIMER_MS 1000 -#define ELECT_TIMER_MS_MIN 150 -#define ELECT_TIMER_MS_MAX 300 +#define ELECT_TIMER_MS_MIN 1500 +#define ELECT_TIMER_MS_MAX 3000 #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) -#define HEARTBEAT_TIMER_MS 30 +#define HEARTBEAT_TIMER_MS 300 #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 09e93fda1c..5a4b7555bf 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -34,11 +34,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset *pQset; + STaosQset * pQset; pthread_t consumerTid; - void *serverRpc; - void *clientRpc; + void * serverRpc; + void * clientRpc; SEpSet myAddr; tmr_h qTimer; diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 30f7c5d9f7..355a08ac84 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -43,11 +43,14 @@ int32_t raftStorePersist(SRaftStore *pRaftStore); int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); -bool raftStoreHasVoted(SRaftStore *pRaftStore); -void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId); -void raftStoreClearVote(SRaftStore *pRaftStore); -void raftStoreNextTerm(SRaftStore *pRaftStore); -void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term); +bool raftStoreHasVoted(SRaftStore *pRaftStore); +void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId); +void raftStoreClearVote(SRaftStore *pRaftStore); +void raftStoreNextTerm(SRaftStore *pRaftStore); +void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term); +int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson); +cJSON * raftStore2Json(SRaftStore *pRaftStore); +char * raftStore2Str(SRaftStore *pRaftStore); // for debug ------------------- void raftStorePrint(SRaftStore *pObj); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 87d6669f59..888d7e16d1 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -102,7 +102,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SyncTerm localPreLogTerm = 0; if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) { - SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm); + SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); assert(pEntry != NULL); localPreLogTerm = pEntry->term; syncEntryDestory(pEntry); @@ -111,9 +111,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { bool logOK = (pMsg->prevLogIndex == SYNC_INDEX_INVALID) || ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) && - (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogIndex == localPreLogTerm)); + (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm)); - // reject + // reject request if ((pMsg->term < ths->pRaftStore->currentTerm) || ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); @@ -134,6 +134,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // return to follower state if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { syncNodeBecomeFollower(ths); + + // need ret? + return ret; } // accept request @@ -144,17 +147,17 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { matchSuccess = true; } if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { - SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm); - assert(pEntry != NULL); - if (pMsg->prevLogTerm == pEntry->term) { + SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); + assert(pPreEntry != NULL); + if (pMsg->prevLogTerm == pPreEntry->term) { matchSuccess = true; } - syncEntryDestory(pEntry); + syncEntryDestory(pPreEntry); } if (matchSuccess) { // delete conflict entries - if (ths->pLogStore->getLastIndex(ths->pLogStore) > pMsg->prevLogIndex) { + if (pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore)) { SyncIndex fromIndex = pMsg->prevLogIndex + 1; ths->pLogStore->truncate(ths->pLogStore, fromIndex); } @@ -178,6 +181,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); syncAppendEntriesReplyDestroy(pReply); + } else { SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); pReply->srcId = ths->myRaftId; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index a75e85601f..f9630f7cfe 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -51,10 +51,10 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p assert(pMsg->term == ths->pRaftStore->currentTerm); if (pMsg->success) { - // nextIndex = reply.matchIndex + 1 + // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); - // matchIndex = reply.matchIndex + // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); // maybe commit @@ -62,6 +62,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + + // notice! int64, uint64 if (nextIndex > SYNC_INDEX_BEGIN) { --nextIndex; } else { diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c new file mode 100644 index 0000000000..850468f393 --- /dev/null +++ b/source/libs/sync/src/syncCommit.c @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncIndexMgr.h" +#include "syncInt.h" + +// \* Leader i advances its commitIndex. +// \* This is done as a separate step from handling AppendEntries responses, +// \* in part to minimize atomic regions, and in part so that leaders of +// \* single-server clusters are able to mark entries committed. +// AdvanceCommitIndex(i) == +// /\ state[i] = Leader +// /\ LET \* The set of servers that agree up through index. +// Agree(index) == {i} \cup {k \in Server : +// matchIndex[i][k] >= index} +// \* The maximum indexes for which a quorum agrees +// agreeIndexes == {index \in 1..Len(log[i]) : +// Agree(index) \in Quorum} +// \* New value for commitIndex'[i] +// newCommitIndex == +// IF /\ agreeIndexes /= {} +// /\ log[i][Max(agreeIndexes)].term = currentTerm[i] +// THEN +// Max(agreeIndexes) +// ELSE +// commitIndex[i] +// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] +// /\ UNCHANGED <> +// +void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { + syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); + syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); +} \ No newline at end of file diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 77c3d07698..6ae70689ef 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -50,6 +50,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { } int32_t syncNodeElect(SSyncNode* pSyncNode) { + int32_t ret = 0; if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { syncNodeFollower2Candidate(pSyncNode); } @@ -62,7 +63,15 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { votesRespondReset(pSyncNode->pVotesRespond, pSyncNode->pRaftStore->currentTerm); syncNodeVoteForSelf(pSyncNode); - int32_t ret = syncNodeRequestVotePeers(pSyncNode); + if (voteGrantedMajority(pSyncNode->pVotesGranted)) { + // only myself, to leader + assert(!pSyncNode->pVotesGranted->toLeader); + syncNodeCandidate2Leader(pSyncNode); + pSyncNode->pVotesGranted->toLeader = true; + return ret; + } + + ret = syncNodeRequestVotePeers(pSyncNode); assert(ret == 0); syncNodeResetElectTimer(pSyncNode); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 8176ac417a..c8448c32eb 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -269,6 +269,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { if (io->FpOnSyncPingReply != NULL) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } @@ -276,6 +277,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { if (io->FpOnSyncClientRequest != NULL) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg); syncClientRequestDestroy(pSyncMsg); } @@ -283,6 +285,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { if (io->FpOnSyncRequestVote != NULL) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg); syncRequestVoteDestroy(pSyncMsg); } @@ -290,6 +293,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { if (io->FpOnSyncRequestVoteReply != NULL) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); } @@ -297,6 +301,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { if (io->FpOnSyncAppendEntries != NULL) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg); syncAppendEntriesDestroy(pSyncMsg); } @@ -304,6 +309,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { if (io->FpOnSyncAppendEntriesReply != NULL) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); } @@ -311,6 +317,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { if (io->FpOnSyncTimeout != NULL) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); } diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 9567938197..aa97104180 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -70,22 +70,24 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { char u64buf[128]; cJSON *pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum); - cJSON *pReplicas = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "replicas", pReplicas); - for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { - cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i])); + if (pSyncIndexMgr != NULL) { + cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum); + cJSON *pReplicas = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "replicas", pReplicas); + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i])); + } + int respondNum = 0; + int *arr = (int *)malloc(sizeof(int) * pSyncIndexMgr->replicaNum); + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + arr[i] = pSyncIndexMgr->index[i]; + } + cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum); + free(arr); + cJSON_AddItemToObject(pRoot, "index", pIndex); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); } - int respondNum = 0; - int *arr = (int *)malloc(sizeof(int) * pSyncIndexMgr->replicaNum); - for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { - arr[i] = pSyncIndexMgr->index[i]; - } - cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum); - free(arr); - cJSON_AddItemToObject(pRoot, "index", pIndex); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); cJSON *pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b919e014a0..bd1b4213c0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -103,6 +103,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { assert(pSyncNode != NULL); memset(pSyncNode, 0, sizeof(SSyncNode)); + if (taosMkDir(pSyncInfo->path) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr()); + return NULL; + } + // init by SSyncInfo pSyncNode->vgId = pSyncInfo->vgId; pSyncNode->syncCfg = pSyncInfo->syncCfg; @@ -200,6 +206,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb; + // start raft + syncNodeBecomeFollower(pSyncNode); + return pSyncNode; } @@ -355,128 +364,130 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); - // init by SSyncInfo - cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId); - cJSON_AddStringToObject(pRoot, "path", pSyncNode->path); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal); - cJSON_AddStringToObject(pRoot, "pWal", u64buf); + if (pSyncNode != NULL) { + // init by SSyncInfo + cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId); + cJSON_AddStringToObject(pRoot, "path", pSyncNode->path); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal); + cJSON_AddStringToObject(pRoot, "pWal", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient); - cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg); - cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient); + cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg); + cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->queue); - cJSON_AddStringToObject(pRoot, "queue", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg); - cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->queue); + cJSON_AddStringToObject(pRoot, "queue", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg); + cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); - // init internal - cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo); - cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe); - cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId); - cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId); + // init internal + cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo); + cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe); + cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId); + cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId); - cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum); - cJSON* pPeers = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers); - for (int i = 0; i < pSyncNode->peersNum; ++i) { - cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i])); + cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum); + cJSON* pPeers = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers); + for (int i = 0; i < pSyncNode->peersNum; ++i) { + cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i])); + } + cJSON* pPeersId = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "peersId", pPeersId); + for (int i = 0; i < pSyncNode->peersNum; ++i) { + cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i])); + } + + cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum); + cJSON* pReplicasId = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId); + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i])); + } + + // raft algorithm + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm); + cJSON_AddStringToObject(pRoot, "pFsm", u64buf); + cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum); + cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache); + cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache); + + // tla+ server vars + cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); + cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state)); + char tmpBuf[RAFT_STORE_BLOCK_SIZE]; + raftStoreSerialize(pSyncNode->pRaftStore, tmpBuf, sizeof(tmpBuf)); + cJSON_AddStringToObject(pRoot, "pRaftStore", tmpBuf); + + // tla+ candidate vars + cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted)); + cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond)); + + // tla+ leader vars + cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex)); + cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex)); + + // tla+ log vars + cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore)); + snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", pSyncNode->commitIndex); + cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); + + // ping timer + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer); + cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf); + cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClock); + cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClockUser); + cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB); + cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerCounter); + cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf); + + // elect timer + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer); + cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf); + cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClock); + cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClockUser); + cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB); + cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerCounter); + cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf); + + // heartbeat timer + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer); + cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf); + cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock); + cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClockUser); + cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB); + cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerCounter); + cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf); + + // callback + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing); + cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply); + cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote); + cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply); + cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries); + cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply); + cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout); + cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf); } - cJSON* pPeersId = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "peersId", pPeersId); - for (int i = 0; i < pSyncNode->peersNum; ++i) { - cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i])); - } - - cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum); - cJSON* pReplicasId = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId); - for (int i = 0; i < pSyncNode->replicaNum; ++i) { - cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i])); - } - - // raft algorithm - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm); - cJSON_AddStringToObject(pRoot, "pFsm", u64buf); - cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum); - cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache); - cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache); - - // tla+ server vars - cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); - cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state)); - char tmpBuf[RAFT_STORE_BLOCK_SIZE]; - raftStoreSerialize(pSyncNode->pRaftStore, tmpBuf, sizeof(tmpBuf)); - cJSON_AddStringToObject(pRoot, "pRaftStore", tmpBuf); - - // tla+ candidate vars - cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted)); - cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond)); - - // tla+ leader vars - cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex)); - cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex)); - - // tla+ log vars - cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore)); - snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", pSyncNode->commitIndex); - cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); - - // ping timer - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer); - cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf); - cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClock); - cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClockUser); - cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB); - cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerCounter); - cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf); - - // elect timer - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer); - cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf); - cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClock); - cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClockUser); - cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB); - cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerCounter); - cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf); - - // heartbeat timer - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer); - cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf); - cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock); - cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClockUser); - cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB); - cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerCounter); - cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf); - - // callback - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing); - cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply); - cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote); - cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply); - cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries); - cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply); - cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout); - cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SSyncNode", pRoot); @@ -500,15 +511,17 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { } void syncNodeBecomeFollower(SSyncNode* pSyncNode) { + // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { pSyncNode->leaderCache = EMPTY_RAFT_ID; } + // state change pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; syncNodeStopHeartbeatTimer(pSyncNode); - int32_t electMS = syncUtilElectRandomMS(); - syncNodeRestartElectTimer(pSyncNode, electMS); + // reset elect timer + syncNodeResetElectTimer(pSyncNode); } // TLA+ Spec @@ -530,20 +543,32 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode) { // /\ UNCHANGED <> // void syncNodeBecomeLeader(SSyncNode* pSyncNode) { + // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; + + // set leader cache pSyncNode->leaderCache = pSyncNode->myRaftId; for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) { + // maybe overwrite myself, no harm + // just do it! pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1; } for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) { + // maybe overwrite myself, no harm + // just do it! pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID; } + // stop elect timer syncNodeStopElectTimer(pSyncNode); - syncNodeStartHeartbeatTimer(pSyncNode); + + // start replicate right now! syncNodeReplicate(pSyncNode); + + // start heartbeat timer + syncNodeStartHeartbeatTimer(pSyncNode); } void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { @@ -568,6 +593,9 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { } // raft vote -------------- + +// just called by syncNodeVoteForSelf +// need assert void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { assert(term == pSyncNode->pRaftStore->currentTerm); assert(!raftStoreHasVoted(pSyncNode->pRaftStore)); @@ -575,6 +603,7 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) raftStoreVote(pSyncNode->pRaftStore, pRaftId); } +// simulate get vote from outside void syncNodeVoteForSelf(SSyncNode* pSyncNode) { syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId)); @@ -589,8 +618,6 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { syncRequestVoteReplyDestroy(pMsg); } -void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {} - // for debug -------------- void syncNodePrint(SSyncNode* pObj) { char* serialized = syncNode2Str(pObj); @@ -676,7 +703,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); } else { - sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64 "", + sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64 + "", pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index e28f780831..453ac13ce3 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -212,17 +212,19 @@ SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) { } cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { - char u64buf[128]; - + char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->logicClock); - cJSON_AddStringToObject(pRoot, "logicClock", u64buf); - cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS); - snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); - cJSON_AddStringToObject(pRoot, "data", u64buf); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->logicClock); + cJSON_AddStringToObject(pRoot, "logicClock", u64buf); + cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS); + snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); + cJSON_AddStringToObject(pRoot, "data", u64buf); + } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot); @@ -342,50 +344,52 @@ SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) { } cJSON* syncPing2Json(const SyncPing* pMsg) { - char u64buf[128]; - + char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + char* s; + s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - char* s; - s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - free(s); - s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - free(s); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncPing", pRoot); @@ -505,50 +509,52 @@ SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { } cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { - char u64buf[128]; - + char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + char* s; + s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - char* s; - s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - free(s); - s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - free(s); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot); @@ -664,24 +670,26 @@ SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) { } cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) { - char u64buf[128]; - + char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->seqNum); - cJSON_AddStringToObject(pRoot, "seqNum", u64buf); - cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak); - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - char* s; - s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - free(s); - s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - free(s); + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->seqNum); + cJSON_AddStringToObject(pRoot, "seqNum", u64buf); + cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak); + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + + char* s; + s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); + } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot); @@ -785,47 +793,49 @@ SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) { } cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { - char u64buf[128]; - + char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex); + cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm); + cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf); } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->lastLogIndex); - cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->lastLogTerm); - cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncRequestVote", pRoot); @@ -929,44 +939,46 @@ SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { } cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { - char u64buf[128]; - + char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted); } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot); @@ -1072,62 +1084,64 @@ SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg) { } cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { - char u64buf[128]; - + char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex); + cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm); + cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->commitIndex); + cJSON_AddStringToObject(pRoot, "commit_index", u64buf); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + char* s; + s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->prevLogIndex); - cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->prevLogTerm); - cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->commitIndex); - cJSON_AddStringToObject(pRoot, "commit_index", u64buf); - - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - char* s; - s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - free(s); - s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - free(s); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncAppendEntries", pRoot); @@ -1231,47 +1245,49 @@ SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg } cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { - char u64buf[128]; - + char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + cJSON_AddNumberToObject(pRoot, "success", pMsg->success); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex); + cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - cJSON_AddNumberToObject(pRoot, "success", pMsg->success); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->matchIndex); - cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 7ececf285a..2589c3a5e7 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -68,29 +68,31 @@ SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) { } cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) { - char u64buf[128]; - + char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "bytes", pEntry->bytes); - cJSON_AddNumberToObject(pRoot, "msgType", pEntry->msgType); - cJSON_AddNumberToObject(pRoot, "originalRpcType", pEntry->originalRpcType); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pEntry->seqNum); - cJSON_AddStringToObject(pRoot, "seqNum", u64buf); - cJSON_AddNumberToObject(pRoot, "isWeak", pEntry->isWeak); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pEntry->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pEntry->index); - cJSON_AddStringToObject(pRoot, "index", u64buf); - cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen); - char* s; - s = syncUtilprintBin((char*)(pEntry->data), pEntry->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - free(s); + if (pEntry != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pEntry->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pEntry->msgType); + cJSON_AddNumberToObject(pRoot, "originalRpcType", pEntry->originalRpcType); + snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->seqNum); + cJSON_AddStringToObject(pRoot, "seqNum", u64buf); + cJSON_AddNumberToObject(pRoot, "isWeak", pEntry->isWeak); + snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->index); + cJSON_AddStringToObject(pRoot, "index", u64buf); + cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen); - s = syncUtilprintBin2((char*)(pEntry->data), pEntry->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - free(s); + char* s; + s = syncUtilprintBin((char*)(pEntry->data), pEntry->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + + s = syncUtilprintBin2((char*)(pEntry->data), pEntry->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); + } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SSyncRaftEntry", pRoot); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 3ae2fc4721..620e923629 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -34,7 +34,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pLogStore->getLastTerm = logStoreLastTerm; pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; pLogStore->getCommitIndex = logStoreGetCommitIndex; - return pLogStore; // to avoid compiler error + return pLogStore; // to avoid compiler error } void logStoreDestory(SSyncLogStore* pLogStore) { @@ -59,21 +59,24 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { walFsync(pWal, true); free(serialized); - return code; // to avoid compiler error + return code; // to avoid compiler error } SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - SSyncRaftEntry* pEntry; + SSyncRaftEntry* pEntry = NULL; - SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); - walReadWithHandle(pWalHandle, index); - pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len); - assert(pEntry != NULL); + if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { + SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); + walReadWithHandle(pWalHandle, index); + pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len); + assert(pEntry != NULL); + + // need to hold, do not new every time!! + walCloseReadHandle(pWalHandle); + } - // need to hold, do not new every time!! - walCloseReadHandle(pWalHandle); return pEntry; } @@ -81,7 +84,7 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; walRollback(pWal, fromIndex); - return 0; // to avoid compiler error + return 0; // to avoid compiler error } SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { @@ -105,7 +108,7 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; walCommit(pWal, index); - return 0; // to avoid compiler error + return 0; // to avoid compiler error } SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) { @@ -126,26 +129,28 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) { } cJSON* logStore2Json(SSyncLogStore* pLogStore) { - char u64buf[128]; - + char u64buf[128]; SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data; cJSON* pRoot = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); - cJSON_AddStringToObject(pRoot, "pWal", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", logStoreLastIndex(pLogStore)); - cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", logStoreLastTerm(pLogStore)); - cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); - cJSON* pEntries = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "pEntries", pEntries); - SyncIndex lastIndex = logStoreLastIndex(pLogStore); - for (SyncIndex i = 0; i <= lastIndex; ++i) { - SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); - cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); - syncEntryDestory(pEntry); + if (pData != NULL && pData->pWal != NULL) { + snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); + cJSON_AddStringToObject(pRoot, "pWal", u64buf); + snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); + + cJSON* pEntries = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "pEntries", pEntries); + SyncIndex lastIndex = logStoreLastIndex(pLogStore); + for (SyncIndex i = 0; i <= lastIndex; ++i) { + SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); + cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); + syncEntryDestory(pEntry); + } } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 5ad618b9c0..3f6db129ce 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -97,16 +97,32 @@ int32_t raftStorePersist(SRaftStore *pRaftStore) { return 0; } -static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; } +static bool raftStoreFileExist(char *path) { + bool b = taosStatFile(path, NULL, NULL) >= 0; + return b; +} int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { assert(pRaftStore != NULL); cJSON *pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm); - cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr); + + char u64Buf[128]; + snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->currentTerm); + cJSON_AddStringToObject(pRoot, "current_term", u64Buf); + + snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->voteFor.addr); + cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf); + cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); + uint64_t u64 = pRaftStore->voteFor.addr; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pRoot, "addr_host", host); + cJSON_AddNumberToObject(pRoot, "addr_port", port); + char *serialized = cJSON_Print(pRoot); int len2 = strlen(serialized); assert(len2 < len); @@ -125,10 +141,12 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { cJSON *pRoot = cJSON_Parse(buf); cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); - pRaftStore->currentTerm = pCurrentTerm->valueint; + assert(cJSON_IsString(pCurrentTerm)); + sscanf(pCurrentTerm->valuestring, "%lu", &(pRaftStore->currentTerm)); cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); - pRaftStore->voteFor.addr = pVoteForAddr->valueint; + assert(cJSON_IsString(pVoteForAddr)); + sscanf(pVoteForAddr->valuestring, "%lu", &(pRaftStore->voteFor.addr)); cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); pRaftStore->voteFor.vgId = pVoteForVgid->valueint; @@ -139,11 +157,10 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { bool raftStoreHasVoted(SRaftStore *pRaftStore) { bool b = syncUtilEmptyId(&(pRaftStore->voteFor)); - return b; + return (!b); } void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) { - assert(!raftStoreHasVoted(pRaftStore)); assert(!syncUtilEmptyId(pRaftId)); pRaftStore->voteFor = *pRaftId; raftStorePersist(pRaftStore); @@ -164,30 +181,68 @@ void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) { raftStorePersist(pRaftStore); } +int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson) { return 0; } + +cJSON *raftStore2Json(SRaftStore *pRaftStore) { + char u64buf[128]; + cJSON *pRoot = cJSON_CreateObject(); + + if (pRaftStore != NULL) { + snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->currentTerm); + cJSON_AddStringToObject(pRoot, "currentTerm", u64buf); + + cJSON *pVoteFor = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->voteFor.addr); + cJSON_AddStringToObject(pVoteFor, "addr", u64buf); + { + uint64_t u64 = pRaftStore->voteFor.addr; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pVoteFor, "addr_host", host); + cJSON_AddNumberToObject(pVoteFor, "addr_port", port); + } + cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId); + cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor); + + int hasVoted = raftStoreHasVoted(pRaftStore); + cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted); + } + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SRaftStore", pRoot); + return pJson; +} + +char *raftStore2Str(SRaftStore *pRaftStore) { + cJSON *pJson = raftStore2Json(pRaftStore); + char * serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + // for debug ------------------- void raftStorePrint(SRaftStore *pObj) { - char serialized[RAFT_STORE_BLOCK_SIZE]; - raftStoreSerialize(pObj, serialized, sizeof(serialized)); + char *serialized = raftStore2Str(pObj); printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized); fflush(NULL); + free(serialized); } void raftStorePrint2(char *s, SRaftStore *pObj) { - char serialized[RAFT_STORE_BLOCK_SIZE]; - raftStoreSerialize(pObj, serialized, sizeof(serialized)); + char *serialized = raftStore2Str(pObj); printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); fflush(NULL); + free(serialized); } void raftStoreLog(SRaftStore *pObj) { - char serialized[RAFT_STORE_BLOCK_SIZE]; - raftStoreSerialize(pObj, serialized, sizeof(serialized)); + char *serialized = raftStore2Str(pObj); sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized); - fflush(NULL); + free(serialized); } void raftStoreLog2(char *s, SRaftStore *pObj) { - char serialized[RAFT_STORE_BLOCK_SIZE]; - raftStoreSerialize(pObj, serialized, sizeof(serialized)); + char *serialized = raftStore2Str(pObj); sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); - fflush(NULL); + free(serialized); } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index b935943a1d..ca04db81b6 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -18,6 +18,7 @@ #include "syncMessage.h" #include "syncRaftEntry.h" #include "syncRaftLog.h" +#include "syncRaftStore.h" #include "syncUtil.h" // TLA+ Spec @@ -50,33 +51,54 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { - SRaftId* pDestId = &(pSyncNode->peersId[i]); - SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); + SRaftId* pDestId = &(pSyncNode->peersId[i]); + // set prevLogIndex + SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); SyncIndex preLogIndex = nextIndex - 1; + // set preLogTerm SyncTerm preLogTerm = 0; if (preLogIndex >= SYNC_INDEX_BEGIN) { SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex); + assert(pPreEntry != NULL); + preLogTerm = pPreEntry->term; + syncEntryDestory(pPreEntry); } - SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); - assert(nextIndex == lastIndex); + // batch optimized + // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); - SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex); - assert(pEntry != NULL); + SyncAppendEntries* pMsg = NULL; + SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex); + if (pEntry != NULL) { + SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes); + + // add pEntry into msg + uint32_t len; + char* serialized = syncEntrySerialize(pEntry, &len); + assert(len == pEntry->bytes); + memcpy(pMsg->data, serialized, len); + + free(serialized); + syncEntryDestory(pEntry); + + } else { + // maybe overflow, send empty record + SyncAppendEntries* pMsg = syncAppendEntriesBuild(0); + } - SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes); pMsg->srcId = pSyncNode->myRaftId; pMsg->destId = *pDestId; + pMsg->term = pSyncNode->pRaftStore->currentTerm; pMsg->prevLogIndex = preLogIndex; pMsg->prevLogTerm = preLogTerm; pMsg->commitIndex = pSyncNode->commitIndex; - pMsg->dataLen = pEntry->bytes; - // add pEntry into msg + // send AppendEntries syncNodeAppendEntries(pSyncNode, pDestId, pMsg); + syncAppendEntriesDestroy(pMsg); } return ret; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index be4f40aaad..062b0244bd 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -56,6 +56,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); if (grant) { + // maybe has already voted for pMsg->srcId + // vote again, no harm raftStoreVote(ths->pRaftStore, &(pMsg->srcId)); } diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 60a2c008fe..11537c1e98 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -45,6 +45,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) return ret; } + assert(!(pMsg->term > ths->pRaftStore->currentTerm)); // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { // syncNodeUpdateTerm(ths, pMsg->term); @@ -52,17 +53,29 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) assert(pMsg->term == ths->pRaftStore->currentTerm); + // This tallies votes even when the current state is not Candidate, + // but they won't be looked at, so it doesn't matter. if (ths->state == TAOS_SYNC_STATE_CANDIDATE) { votesRespondAdd(ths->pVotesRespond, pMsg); if (pMsg->voteGranted) { + // add vote voteGrantedVote(ths->pVotesGranted, pMsg); + + // maybe to leader if (voteGrantedMajority(ths->pVotesGranted)) { - if (ths->pVotesGranted->toLeader) { + if (!ths->pVotesGranted->toLeader) { syncNodeCandidate2Leader(ths); + + // prevent to leader again! ths->pVotesGranted->toLeader = true; } } + } else { + ; + // do nothing + // UNCHANGED <> } } + return ret; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 3a48b0cbb3..0d3a3c3cc5 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -24,7 +24,8 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->pingTimerCounter); - syncNodePingAll(ths); + // syncNodePingAll(ths); + syncNodePingPeers(ths); } } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 6cb6a28805..69a4c48f39 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -82,30 +82,32 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) { char u64buf[128]; cJSON *pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum); - cJSON *pReplicas = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "replicas", pReplicas); - for (int i = 0; i < pVotesGranted->replicaNum; ++i) { - cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i])); - } - int *arr = (int *)malloc(sizeof(int) * pVotesGranted->replicaNum); - for (int i = 0; i < pVotesGranted->replicaNum; ++i) { - arr[i] = pVotesGranted->isGranted[i]; - } - cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum); - free(arr); - cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted); + if (pVotesGranted != NULL) { + cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum); + cJSON *pReplicas = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "replicas", pReplicas); + for (int i = 0; i < pVotesGranted->replicaNum; ++i) { + cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i])); + } + int *arr = (int *)malloc(sizeof(int) * pVotesGranted->replicaNum); + for (int i = 0; i < pVotesGranted->replicaNum; ++i) { + arr[i] = pVotesGranted->isGranted[i]; + } + cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum); + free(arr); + cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted); - cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pVotesGranted->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum); - cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader); - snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes); + snprintf(u64buf, sizeof(u64buf), "%lu", pVotesGranted->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum); + cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader); + snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); - bool majority = voteGrantedMajority(pVotesGranted); - cJSON_AddNumberToObject(pRoot, "majority", majority); + bool majority = voteGrantedMajority(pVotesGranted); + cJSON_AddNumberToObject(pRoot, "majority", majority); + } cJSON *pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot); @@ -122,27 +124,27 @@ char *voteGranted2Str(SVotesGranted *pVotesGranted) { // for debug ------------------- void voteGrantedPrint(SVotesGranted *pObj) { char *serialized = voteGranted2Str(pObj); - printf("voteGrantedPrint | len:%zu | %s \n", strlen(serialized), serialized); + printf("voteGrantedPrint | len:%lu | %s \n", strlen(serialized), serialized); fflush(NULL); free(serialized); } void voteGrantedPrint2(char *s, SVotesGranted *pObj) { char *serialized = voteGranted2Str(pObj); - printf("voteGrantedPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized); + printf("voteGrantedPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); fflush(NULL); free(serialized); } void voteGrantedLog(SVotesGranted *pObj) { char *serialized = voteGranted2Str(pObj); - sTrace("voteGrantedLog | len:%zu | %s", strlen(serialized), serialized); + sTrace("voteGrantedLog | len:%lu | %s", strlen(serialized), serialized); free(serialized); } void voteGrantedLog2(char *s, SVotesGranted *pObj) { char *serialized = voteGranted2Str(pObj); - sTrace("voteGrantedLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized); + sTrace("voteGrantedLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); free(serialized); } @@ -203,29 +205,31 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) { char u64buf[128]; cJSON *pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum); - cJSON *pReplicas = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "replicas", pReplicas); - for (int i = 0; i < pVotesRespond->replicaNum; ++i) { - cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i])); - } - int respondNum = 0; - int *arr = (int *)malloc(sizeof(int) * pVotesRespond->replicaNum); - for (int i = 0; i < pVotesRespond->replicaNum; ++i) { - arr[i] = pVotesRespond->isRespond[i]; - if (pVotesRespond->isRespond[i]) { - respondNum++; + if (pVotesRespond != NULL) { + cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum); + cJSON *pReplicas = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "replicas", pReplicas); + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i])); } - } - cJSON *pIsRespond = cJSON_CreateIntArray(arr, pVotesRespond->replicaNum); - free(arr); - cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond); - cJSON_AddNumberToObject(pRoot, "respondNum", respondNum); + int respondNum = 0; + int *arr = (int *)malloc(sizeof(int) * pVotesRespond->replicaNum); + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + arr[i] = pVotesRespond->isRespond[i]; + if (pVotesRespond->isRespond[i]) { + respondNum++; + } + } + cJSON *pIsRespond = cJSON_CreateIntArray(arr, pVotesRespond->replicaNum); + free(arr); + cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond); + cJSON_AddNumberToObject(pRoot, "respondNum", respondNum); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pVotesRespond->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pVotesRespond->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + } cJSON *pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot); @@ -242,26 +246,26 @@ char *votesRespond2Str(SVotesRespond *pVotesRespond) { // for debug ------------------- void votesRespondPrint(SVotesRespond *pObj) { char *serialized = votesRespond2Str(pObj); - printf("votesRespondPrint | len:%zu | %s \n", strlen(serialized), serialized); + printf("votesRespondPrint | len:%lu | %s \n", strlen(serialized), serialized); fflush(NULL); free(serialized); } void votesRespondPrint2(char *s, SVotesRespond *pObj) { char *serialized = votesRespond2Str(pObj); - printf("votesRespondPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized); + printf("votesRespondPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); fflush(NULL); free(serialized); } void votesRespondLog(SVotesRespond *pObj) { char *serialized = votesRespond2Str(pObj); - sTrace("votesRespondLog | len:%zu | %s", strlen(serialized), serialized); + sTrace("votesRespondLog | len:%lu | %s", strlen(serialized), serialized); free(serialized); } void votesRespondLog2(char *s, SVotesRespond *pObj) { char *serialized = votesRespond2Str(pObj); - sTrace("votesRespondLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized); + sTrace("votesRespondLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); free(serialized); } diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 6ade78936d..2583deb32f 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -25,6 +25,9 @@ add_executable(syncTimeoutTest "") add_executable(syncPingTest "") add_executable(syncPingReplyTest "") add_executable(syncRpcMsgTest "") +add_executable(syncPingTimerTest2 "") +add_executable(syncPingSelfTest "") +add_executable(syncElectTest "") target_sources(syncTest @@ -135,6 +138,18 @@ target_sources(syncRpcMsgTest PRIVATE "syncRpcMsgTest.cpp" ) +target_sources(syncPingTimerTest2 + PRIVATE + "syncPingTimerTest2.cpp" +) +target_sources(syncPingSelfTest + PRIVATE + "syncPingSelfTest.cpp" +) +target_sources(syncElectTest + PRIVATE + "syncElectTest.cpp" +) target_include_directories(syncTest @@ -272,6 +287,26 @@ target_include_directories(syncRpcMsgTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncPingTimerTest2 + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncPingSelfTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncElectTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncElectTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -382,6 +417,18 @@ target_link_libraries(syncRpcMsgTest sync gtest_main ) +target_link_libraries(syncPingTimerTest2 + sync + gtest_main +) +target_link_libraries(syncPingSelfTest + sync + gtest_main +) +target_link_libraries(syncElectTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp new file mode 100644 index 0000000000..b8a1460f35 --- /dev/null +++ b/source/libs/sync/test/syncElectTest.cpp @@ -0,0 +1,124 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; +SWal* pWal; +SSyncNode* gSyncNode; + +SSyncNode* syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "./elect_test_%d", myIndex); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + + char tmpdir[128]; + snprintf(tmpdir, sizeof(tmpdir), "./elect_test_wal_%d", myIndex); + pWal = walOpen(tmpdir, &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +SSyncNode* syncInitTest() { return syncNodeInit(); } + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + gSyncNode = syncInitTest(); + assert(gSyncNode != NULL); + syncNodePrint2((char*)"", gSyncNode); + + initRaftId(gSyncNode); + + //--------------------------- + while (1) { + sTrace("while 1 sleep, state: %d, %s", gSyncNode->state, syncUtilState2String(gSyncNode->state)); + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index 7898fda8c0..a3e5f41c85 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -47,12 +47,11 @@ SSyncNode* syncNodeInit() { gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; diff --git a/source/libs/sync/test/syncLogStoreTest.cpp b/source/libs/sync/test/syncLogStoreTest.cpp index 1b05f76fa2..c1cb66f382 100644 --- a/source/libs/sync/test/syncLogStoreTest.cpp +++ b/source/libs/sync/test/syncLogStoreTest.cpp @@ -81,7 +81,7 @@ SSyncNode* syncNodeInit() { SSyncNode* syncInitTest() { return syncNodeInit(); } void logStoreTest() { - logStorePrint2((char*)"logStoreTest2", pSyncNode->pLogStore); + logStorePrint2((char*)"logStoreTest", pSyncNode->pLogStore); assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_INVALID); @@ -105,10 +105,10 @@ void logStoreTest() { assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_BEGIN); } } - logStorePrint(pSyncNode->pLogStore); + logStorePrint2((char*)"after appendEntry", pSyncNode->pLogStore); pSyncNode->pLogStore->truncate(pSyncNode->pLogStore, 3); - logStorePrint(pSyncNode->pLogStore); + logStorePrint2((char*)"after truncate 3", pSyncNode->pLogStore); } void initRaftId(SSyncNode* pSyncNode) { diff --git a/source/libs/sync/test/syncPingSelfTest.cpp b/source/libs/sync/test/syncPingSelfTest.cpp new file mode 100644 index 0000000000..05e4d99cb0 --- /dev/null +++ b/source/libs/sync/test/syncPingSelfTest.cpp @@ -0,0 +1,102 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; + +SSyncNode* syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +SSyncNode* syncInitTest() { return syncNodeInit(); } + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + SSyncNode* pSyncNode = syncInitTest(); + assert(pSyncNode != NULL); + syncNodePrint2((char*)"", pSyncNode); + + initRaftId(pSyncNode); + + //--------------------------- + + while (1) { + syncNodePingSelf(pSyncNode); + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncPingTimerTest.cpp b/source/libs/sync/test/syncPingTimerTest.cpp index e69878632f..20d4a9ce58 100644 --- a/source/libs/sync/test/syncPingTimerTest.cpp +++ b/source/libs/sync/test/syncPingTimerTest.cpp @@ -47,6 +47,7 @@ SSyncNode* syncNodeInit() { gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; diff --git a/source/libs/sync/test/syncPingTimerTest2.cpp b/source/libs/sync/test/syncPingTimerTest2.cpp new file mode 100644 index 0000000000..2a041f3f5d --- /dev/null +++ b/source/libs/sync/test/syncPingTimerTest2.cpp @@ -0,0 +1,106 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; + +SSyncNode* syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +SSyncNode* syncInitTest() { return syncNodeInit(); } + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + SSyncNode* pSyncNode = syncInitTest(); + assert(pSyncNode != NULL); + syncNodePrint2((char*)"", pSyncNode); + + initRaftId(pSyncNode); + + //--------------------------- + + sTrace("syncNodeStartPingTimer ..."); + ret = syncNodeStartPingTimer(pSyncNode); + assert(ret == 0); + + while (1) { + sTrace("while 1 sleep ..."); + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncRaftStoreTest.cpp b/source/libs/sync/test/syncRaftStoreTest.cpp index 0c1c9b881e..688802625a 100644 --- a/source/libs/sync/test/syncRaftStoreTest.cpp +++ b/source/libs/sync/test/syncRaftStoreTest.cpp @@ -3,6 +3,7 @@ #include #include "syncIO.h" #include "syncInt.h" +#include "syncUtil.h" void logTest() { sTrace("--- sync log test: trace"); @@ -13,6 +14,21 @@ void logTest() { sFatal("--- sync log test: fatal"); } +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 5; +int32_t myIndex = 0; +SRaftId ids[TSDB_MAX_REPLICA]; + +void initRaftId() { + for (int i = 0; i < replicaNum; ++i) { + ids[i].addr = syncUtilAddr2U64("127.0.0.1", ports[i]); + ids[i].vgId = 1234; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + int main() { // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; @@ -20,23 +36,35 @@ int main() { logTest(); - SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); + initRaftId(); + + SRaftStore* pRaftStore = raftStoreOpen("./test_raft_store.json"); assert(pRaftStore != NULL); - raftStorePrint(pRaftStore); + raftStorePrint2((char*)"==raftStoreOpen==", pRaftStore); -#if 0 - pRaftStore->currentTerm = 100; - pRaftStore->voteFor.addr = 200; - pRaftStore->voteFor.vgId = 300; - raftStorePersist(pRaftStore); - raftStorePrint(pRaftStore); -#endif + raftStoreSetTerm(pRaftStore, 100); + raftStorePrint2((char*)"==raftStoreSetTerm==", pRaftStore); - ++(pRaftStore->currentTerm); - ++(pRaftStore->voteFor.addr); - ++(pRaftStore->voteFor.vgId); - raftStorePersist(pRaftStore); - raftStorePrint(pRaftStore); + raftStoreVote(pRaftStore, &ids[0]); + raftStorePrint2((char*)"==raftStoreVote==", pRaftStore); + + raftStoreClearVote(pRaftStore); + raftStorePrint2((char*)"==raftStoreClearVote==", pRaftStore); + + raftStoreVote(pRaftStore, &ids[1]); + raftStorePrint2((char*)"==raftStoreVote==", pRaftStore); + + raftStoreNextTerm(pRaftStore); + raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore); + + raftStoreNextTerm(pRaftStore); + raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore); + + raftStoreNextTerm(pRaftStore); + raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore); + + raftStoreNextTerm(pRaftStore); + raftStorePrint2((char*)"==raftStoreNextTerm==", pRaftStore); return 0; }