Merge pull request #10659 from taosdata/feature/3.0_mhli

Feature/3.0 mhli
This commit is contained in:
Li Minghao 2022-03-10 16:28:30 +08:00 committed by GitHub
commit 19438fbbf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 849 additions and 328 deletions

View File

@ -24,6 +24,7 @@ extern "C" {
#include <tdatablock.h>
#include "taosdef.h"
#include "trpc.h"
#include "wal.h"
typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId;
@ -87,25 +88,22 @@ typedef struct SSyncFSM {
} SSyncFSM;
struct SSyncRaftEntry;
typedef struct SSyncRaftEntry SSyncRaftEntry;
// abstract definition of log store in raft
// SWal implements it
typedef struct SSyncLogStore {
void* data;
// append one log entry
int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pBuf);
int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
// get one log entry, user need to free pBuf->data
int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pBuf);
// get one log entry, user need to free pEntry->pCont
SSyncRaftEntry* (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index);
// update log store commit index with "index"
int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);
// truncate log with index, entries after the given index (>index) will be deleted
int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex index);
// return commit index of log
SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
// return index of last entry
SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore);
@ -113,6 +111,12 @@ typedef struct SSyncLogStore {
// return term of last entry
SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore);
// update log store commit index with "index"
int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);
// return commit index of log
SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);
} SSyncLogStore;
// raft need to persist two variables in storage: currentTerm, voteFor
@ -134,7 +138,7 @@ typedef struct SSyncInfo {
SyncGroupId vgId;
SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN];
char walPath[TSDB_FILENAME_LEN];
SWal* pWal;
SSyncFSM* pFsm;
void* rpcClient;
@ -153,7 +157,7 @@ void syncCleanUp();
int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole);

View File

@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h"
// TLA+ Spec

View File

@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h"
// TLA+ Spec

View File

@ -116,7 +116,8 @@ typedef struct SSyncNode {
SyncGroupId vgId;
SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN];
char walPath[TSDB_FILENAME_LEN];
char raftStorePath[TSDB_FILENAME_LEN * 2];
SWal* pWal;
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue;
@ -195,8 +196,6 @@ typedef struct SSyncNode {
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void syncNodeClose(SSyncNode* pSyncNode);
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
@ -213,6 +212,11 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
// for debug
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
void syncNodePrint(char* s, const SSyncNode* pSyncNode);
#ifdef __cplusplus
}
#endif

View File

@ -24,8 +24,7 @@ extern "C" {
#include <stdio.h>
#include <stdlib.h>
#include "cJSON.h"
#include "sync.h"
#include "syncRaftEntry.h"
#include "syncInt.h"
#include "taosdef.h"
// encode as uint32
@ -46,6 +45,7 @@ typedef enum ESyncMessageType {
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
cJSON* syncRpcUnknownMsg2Json();
char* syncRpcMsg2Str(SRpcMsg* pRpcMsg);
// ---------------------------------------------
typedef enum ESyncTimeoutType {
@ -123,12 +123,22 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId);
typedef struct SyncClientRequest {
uint32_t bytes;
uint32_t msgType;
int64_t seqNum;
uint32_t originalRpcType;
uint64_t seqNum;
bool isWeak;
uint32_t dataLen;
char data[];
} SyncClientRequest;
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
void syncClientRequestDestroy(SyncClientRequest* pMsg);
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen);
void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg);
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg);
void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg);
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg);
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak);
// ---------------------------------------------
typedef struct SyncClientRequestReply {
uint32_t bytes;

View File

@ -23,7 +23,6 @@ extern "C" {
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncRaft.h"
#include "taosdef.h"
#ifdef __cplusplus

View File

@ -1,93 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_H
#define _TD_LIBS_SYNC_RAFT_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "sync.h"
#include "syncMessage.h"
#include "taosdef.h"
#if 0
typedef struct SRaftId {
SyncNodeId addr;
SyncGroupId vgId;
} SRaftId;
typedef struct SRaft {
SRaftId id;
SSyncFSM* pFsm;
int32_t (*FpPing)(struct SRaft* ths, const RaftPing* pMsg);
int32_t (*FpOnPing)(struct SRaft* ths, RaftPing* pMsg);
int32_t (*FpOnPingReply)(struct SRaft* ths, RaftPingReply* pMsg);
int32_t (*FpRequestVote)(struct SRaft* ths, const RaftRequestVote* pMsg);
int32_t (*FpOnRequestVote)(struct SRaft* ths, RaftRequestVote* pMsg);
int32_t (*FpOnRequestVoteReply)(struct SRaft* ths, RaftRequestVoteReply* pMsg);
int32_t (*FpAppendEntries)(struct SRaft* ths, const RaftAppendEntries* pMsg);
int32_t (*FpOnAppendEntries)(struct SRaft* ths, RaftAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(struct SRaft* ths, RaftAppendEntriesReply* pMsg);
} SRaft;
SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm);
void raftClose(SRaft* pRaft);
static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg);
static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg);
static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg);
static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg);
static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg);
static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg);
static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg);
static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg);
static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg);
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak);
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft);
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_RAFT_H*/

View File

@ -24,15 +24,31 @@ extern "C" {
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
typedef struct SSyncRaftEntry {
SyncTerm term;
SyncIndex index;
SSyncBuffer data;
int8_t flag;
uint32_t bytes;
uint32_t msgType;
uint32_t originalRpcType;
uint64_t seqNum;
bool isWeak;
SyncTerm term;
SyncIndex index;
uint32_t dataLen;
char data[];
} SSyncRaftEntry;
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen);
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index);
void syncEntryDestory(SSyncRaftEntry* pEntry);
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len);
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len);
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
char* syncEntry2Str(const SSyncRaftEntry* pEntry);
void syncEntryPrint(const SSyncRaftEntry* pEntry);
void syncEntryPrint2(char *s, const SSyncRaftEntry* pEntry);
#ifdef __cplusplus
}
#endif

View File

@ -24,27 +24,47 @@ extern "C" {
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncRaftEntry.h"
#include "taosdef.h"
int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf);
typedef struct SSyncLogStoreData {
SSyncNode* pSyncNode;
SWal* pWal;
} SSyncLogStoreData;
// get one log entry, user need to free pBuf->data
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf);
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
// update log store commit index with "index"
int32_t raftLogUpdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index);
void logStoreDestory(SSyncLogStore* pLogStore);
// truncate log with index, entries after the given index (>index) will be deleted
int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex index);
// append one log entry
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
// return commit index of log
SyncIndex raftLogGetCommitIndex(struct SSyncLogStore* pLogStore);
// get one log entry, user need to free pEntry->pCont
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index);
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
// return index of last entry
SyncIndex raftLogGetLastIndex(struct SSyncLogStore* pLogStore);
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
// return term of last entry
SyncTerm raftLogGetLastTerm(struct SSyncLogStore* pLogStore);
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
// update log store commit index with "index"
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore);
char* logStore2Str(SSyncLogStore* pLogStore);
// for debug
void logStorePrint(SSyncLogStore* pLogStore);
#ifdef __cplusplus
}

View File

@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h>
#include "cJSON.h"
#include "syncInt.h"
#include "syncRaft.h"
#include "taosdef.h"
#define RAFT_STORE_BLOCK_SIZE 512

View File

@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h"
// TLA+ Spec

View File

@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h"
// TLA+ Spec

View File

@ -24,7 +24,6 @@ extern "C" {
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncRaft.h"
#include "taosdef.h"
int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);

View File

@ -25,7 +25,6 @@ extern "C" {
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaft.h"
#include "taosdef.h"
// TLA+ Spec

View File

@ -49,6 +49,9 @@ cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p);
cJSON* syncUtilRaftId2Json(const SRaftId* p);
char* syncUtilRaftId2Str(const SRaftId* p);
const char* syncUtilState2String(ESyncState state);
bool syncUtilCanPrint(char c);
char* syncUtilprintBin(char* ptr, uint32_t len);
char* syncUtilprintBin2(char* ptr, uint32_t len);
#ifdef __cplusplus
}

View File

@ -15,7 +15,7 @@
#include "syncIO.h"
#include <tdatablock.h>
#include "syncOnMessage.h"
#include "syncMessage.h"
#include "tglobal.h"
#include "ttimer.h"
#include "tutil.h"
@ -220,12 +220,17 @@ static void *syncIOConsumerFunc(void *param) {
while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL);
sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break;
if (numOfMsgs <= 0) {
break;
}
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
char *s = syncRpcMsg2Str(pRpcMsg);
sTrace("syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s", pRpcMsg->msgType, pRpcMsg->contLen,
(char *)(pRpcMsg->pCont));
s);
free(s);
if (pRpcMsg->msgType == SYNC_PING) {
if (io->FpOnSyncPing != NULL) {
@ -247,7 +252,7 @@ static void *syncIOConsumerFunc(void *param) {
}
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
if (io->FpOnSyncRequestVote) {
if (io->FpOnSyncRequestVote != NULL) {
SyncRequestVote *pSyncMsg;
pSyncMsg = syncRequestVoteBuild(pRpcMsg->contLen);
syncRequestVoteFromRpcMsg(pRpcMsg, pSyncMsg);
@ -256,7 +261,7 @@ static void *syncIOConsumerFunc(void *param) {
}
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
if (io->FpOnSyncRequestVoteReply) {
if (io->FpOnSyncRequestVoteReply != NULL) {
SyncRequestVoteReply *pSyncMsg;
pSyncMsg = SyncRequestVoteReplyBuild();
syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg);
@ -265,7 +270,7 @@ static void *syncIOConsumerFunc(void *param) {
}
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
if (io->FpOnSyncAppendEntries) {
if (io->FpOnSyncAppendEntries != NULL) {
SyncAppendEntries *pSyncMsg;
pSyncMsg = syncAppendEntriesBuild(pRpcMsg->contLen);
syncAppendEntriesFromRpcMsg(pRpcMsg, pSyncMsg);
@ -274,7 +279,7 @@ static void *syncIOConsumerFunc(void *param) {
}
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
if (io->FpOnSyncAppendEntriesReply) {
if (io->FpOnSyncAppendEntriesReply != NULL) {
SyncAppendEntriesReply *pSyncMsg;
pSyncMsg = syncAppendEntriesReplyBuild();
syncAppendEntriesReplyFromRpcMsg(pRpcMsg, pSyncMsg);

View File

@ -18,8 +18,9 @@
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
#include "syncEnv.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaft.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
@ -59,13 +60,32 @@ int64_t syncStart(const SSyncInfo* pSyncInfo) {
return 0;
}
void syncStop(int64_t rid) {}
void syncStop(int64_t rid) {
SSyncNode* pSyncNode = NULL; // get pointer from rid
syncNodeClose(pSyncNode);
}
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
SSyncNode* pSyncNode = NULL; // get pointer from rid
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak);
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncClientRequestDestroy(pSyncMsg);
} else {
sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state));
return -1; // need define err code !!
}
return 0;
}
ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }
ESyncState syncGetMyRole(int64_t rid) {
SSyncNode* pSyncNode = NULL; // get pointer from rid
return pSyncNode->state;
}
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}
@ -78,7 +98,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->vgId = pSyncInfo->vgId;
pSyncNode->syncCfg = pSyncInfo->syncCfg;
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
memcpy(pSyncNode->walPath, pSyncInfo->walPath, sizeof(pSyncNode->walPath));
snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path);
pSyncNode->pWal = pSyncInfo->pWal;
pSyncNode->rpcClient = pSyncInfo->rpcClient;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->queue = pSyncInfo->queue;
@ -114,20 +135,27 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
// init life cycle
// init server vars
// init TLA+ server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->pRaftStore = raftStoreOpen(pSyncInfo->walPath);
pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
assert(pSyncNode->pRaftStore != NULL);
// init candidate vars
// init TLA+ candidate vars
pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
assert(pSyncNode->pVotesGranted != NULL);
pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
assert(pSyncNode->pVotesRespond != NULL);
// init leader vars
pSyncNode->pNextIndex = NULL;
pSyncNode->pMatchIndex = NULL;
// init TLA+ leader vars
pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
assert(pSyncNode->pNextIndex != NULL);
pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
assert(pSyncNode->pMatchIndex != NULL);
// init TLA+ log vars
pSyncNode->pLogStore = logStoreCreate(pSyncNode);
assert(pSyncNode->pLogStore != NULL);
pSyncNode->commitIndex = 0;
// init ping timer
pSyncNode->pPingTimer = NULL;
@ -177,7 +205,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// init by SSyncInfo
cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
cJSON_AddStringToObject(pRoot, "walPath", pSyncNode->walPath);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient);
cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
@ -298,6 +327,13 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
return serialized;
}
void syncNodePrint(char* s, const SSyncNode* pSyncNode) {
char* ss = syncNode2Str(pSyncNode);
// sTrace("syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss);
fprintf(stderr, "syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss);
free(ss);
}
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
@ -472,6 +508,8 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
}
static void syncNodeEqPingTimer(void* param, void* tmrId) {
sTrace("<-- syncNodeEqPingTimer -->");
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
@ -484,7 +522,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
// reset timer ms
// pSyncNode->pingTimerMS += 100;
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
} else {
sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock,
@ -506,7 +544,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
// reset timer ms
pSyncNode->electTimerMS = syncUtilElectRandomMS();
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
} else {
sTrace("syncNodeEqElectTimer: electTimerLogicClock:%lu, electTimerLogicClockUser:%lu",
@ -530,7 +568,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
// reset timer ms
// pSyncNode->heartbeatTimerMS += 100;
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pHeartbeatTimer);
} else {
sTrace("syncNodeEqHeartbeatTimer: heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu",

View File

@ -14,7 +14,6 @@
*/
#include "syncMessage.h"
#include "syncRaft.h"
#include "syncUtil.h"
#include "tcoding.h"
@ -36,7 +35,8 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
pRoot = syncPingReply2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
pRoot = syncRpcUnknownMsg2Json();
SyncClientRequest* pSyncMsg = (SyncClientRequest*)pRpcMsg->pCont;
pRoot = syncClientRequest2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) {
pRoot = syncRpcUnknownMsg2Json();
@ -76,6 +76,13 @@ cJSON* syncRpcUnknownMsg2Json() {
return pJson;
}
char* syncRpcMsg2Str(SRpcMsg* pRpcMsg) {
cJSON* pJson = syncRpcMsg2Json(pRpcMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// ---- message process SyncTimeout----
SyncTimeout* syncTimeoutBuild() {
uint32_t bytes = sizeof(SyncTimeout);
@ -149,6 +156,7 @@ SyncPing* syncPingBuild(uint32_t dataLen) {
pMsg->bytes = bytes;
pMsg->msgType = SYNC_PING;
pMsg->dataLen = dataLen;
return pMsg;
}
void syncPingDestroy(SyncPing* pMsg) {
@ -247,6 +255,7 @@ SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
pMsg->bytes = bytes;
pMsg->msgType = SYNC_PING_REPLY;
pMsg->dataLen = dataLen;
return pMsg;
}
void syncPingReplyDestroy(SyncPingReply* pMsg) {
@ -337,6 +346,73 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId)
return pMsg;
}
// ---- message process SyncClientRequest----
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncClientRequest) + dataLen;
SyncClientRequest* pMsg = malloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_CLIENT_REQUEST;
pMsg->seqNum = 0;
pMsg->isWeak = false;
pMsg->dataLen = dataLen;
return pMsg;
}
void syncClientRequestDestroy(SyncClientRequest* pMsg) {
if (pMsg != NULL) {
free(pMsg);
}
}
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen) {
assert(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg) {
memcpy(pMsg, buf, len);
assert(len == pMsg->bytes);
}
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncClientRequestSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg) {
syncClientRequestDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) {
char u64buf[128];
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->seqNum);
cJSON_AddStringToObject(pRoot, "seqNum", u64buf);
cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot);
return pJson;
}
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) {
SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen);
pMsg->originalRpcType = pOriginalRpcMsg->msgType;
pMsg->seqNum = seqNum;
pMsg->isWeak = isWeak;
memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen);
return pMsg;
}
// ---- message process SyncRequestVote----
SyncRequestVote* syncRequestVoteBuild() {
uint32_t bytes = sizeof(SyncRequestVote);
@ -344,6 +420,7 @@ SyncRequestVote* syncRequestVoteBuild() {
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_REQUEST_VOTE;
return pMsg;
}
void syncRequestVoteDestroy(SyncRequestVote* pMsg) {
@ -429,6 +506,7 @@ SyncRequestVoteReply* SyncRequestVoteReplyBuild() {
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_REQUEST_VOTE_REPLY;
return pMsg;
}
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) {
@ -512,6 +590,7 @@ SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen) {
pMsg->bytes = bytes;
pMsg->msgType = SYNC_APPEND_ENTRIES;
pMsg->dataLen = dataLen;
return pMsg;
}
void syncAppendEntriesDestroy(SyncAppendEntries* pMsg) {
@ -604,6 +683,7 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild() {
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_APPEND_ENTRIES_REPLY;
return pMsg;
}
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) {

View File

@ -1,70 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncRaft.h"
#include "sync.h"
#if 0
SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) {
SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft));
assert(pRaft != NULL);
pRaft->id = raftId;
pRaft->pFsm = pFsm;
pRaft->FpPing = doRaftPing;
pRaft->FpOnPing = onRaftPing;
pRaft->FpOnPingReply = onRaftPingReply;
pRaft->FpRequestVote = doRaftRequestVote;
pRaft->FpOnRequestVote = onRaftRequestVote;
pRaft->FpOnRequestVoteReply = onRaftRequestVoteReply;
pRaft->FpAppendEntries = doRaftAppendEntries;
pRaft->FpOnAppendEntries = onRaftAppendEntries;
pRaft->FpOnAppendEntriesReply = onRaftAppendEntriesReply;
return pRaft;
}
void raftClose(SRaft* pRaft) {
assert(pRaft != NULL);
free(pRaft);
}
static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg) { return 0; }
static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg) { return 0; }
static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg) { return 0; }
static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg) { return 0; }
static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg) { return 0; }
static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg) { return 0; }
static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg) { return 0; }
static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg) { return 0; }
static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg) { return 0; }
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; }
#endif

View File

@ -14,3 +14,104 @@
*/
#include "syncRaftEntry.h"
#include "syncUtil.h"
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SSyncRaftEntry) + dataLen;
SSyncRaftEntry* pEntry = malloc(bytes);
assert(pEntry != NULL);
memset(pEntry, 0, bytes);
pEntry->bytes = bytes;
pEntry->dataLen = dataLen;
return pEntry;
}
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
assert(pEntry != NULL);
pEntry->msgType = pMsg->msgType;
pEntry->originalRpcType = pMsg->originalRpcType;
pEntry->seqNum = pMsg->seqNum;
pEntry->isWeak = pMsg->isWeak;
pEntry->term = term;
pEntry->index = index;
pEntry->dataLen = pMsg->dataLen;
memcpy(pEntry->data, pMsg->data, pMsg->dataLen);
return pEntry;
}
void syncEntryDestory(SSyncRaftEntry* pEntry) {
if (pEntry != NULL) {
free(pEntry);
}
}
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
char* buf = malloc(pEntry->bytes);
assert(buf != NULL);
memcpy(buf, pEntry, pEntry->bytes);
if (len != NULL) {
*len = pEntry->bytes;
}
return buf;
}
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SSyncRaftEntry* pEntry = malloc(bytes);
assert(pEntry != NULL);
memcpy(pEntry, buf, len);
assert(len == pEntry->bytes);
return pEntry;
}
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) {
char u64buf[128];
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "bytes", pEntry->bytes);
cJSON_AddNumberToObject(pRoot, "msgType", pEntry->msgType);
cJSON_AddNumberToObject(pRoot, "originalRpcType", pEntry->originalRpcType);
snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->seqNum);
cJSON_AddStringToObject(pRoot, "seqNum", u64buf);
cJSON_AddNumberToObject(pRoot, "isWeak", pEntry->isWeak);
snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->index);
cJSON_AddStringToObject(pRoot, "index", u64buf);
cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen);
char* s;
s = syncUtilprintBin((char*)(pEntry->data), pEntry->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
free(s);
s = syncUtilprintBin2((char*)(pEntry->data), pEntry->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
free(s);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncRaftEntry", pRoot);
return pJson;
}
char* syncEntry2Str(const SSyncRaftEntry* pEntry) {
cJSON* pJson = syncEntry2Json(pEntry);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
void syncEntryPrint(const SSyncRaftEntry* pEntry) {
char* s = syncEntry2Str(pEntry);
sTrace("%s", s);
free(s);
}
void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry) {
char* ss = syncEntry2Str(pEntry);
sTrace("%s | %s", s, ss);
free(ss);
}

View File

@ -14,46 +14,160 @@
*/
#include "syncRaftLog.h"
#include "wal.h"
int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; }
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStore* pLogStore = malloc(sizeof(SSyncLogStore));
assert(pLogStore != NULL);
// get one log entry, user need to free pBuf->data
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf) { return 0; }
pLogStore->data = malloc(sizeof(SSyncLogStoreData));
assert(pLogStore->data != NULL);
// TLA+ Spec
// \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses,
// \* in part to minimize atomic regions, and in part so that leaders of
// \* single-server clusters are able to mark entries committed.
// AdvanceCommitIndex(i) ==
// /\ state[i] = Leader
// /\ LET \* The set of servers that agree up through index.
// Agree(index) == {i} \cup {k \in Server :
// matchIndex[i][k] >= index}
// \* The maximum indexes for which a quorum agrees
// agreeIndexes == {index \in 1..Len(log[i]) :
// Agree(index) \in Quorum}
// \* New value for commitIndex'[i]
// newCommitIndex ==
// IF /\ agreeIndexes /= {}
// /\ log[i][Max(agreeIndexes)].term = currentTerm[i]
// THEN
// Max(agreeIndexes)
// ELSE
// commitIndex[i]
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
//
int32_t raftLogupdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; }
SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode;
pData->pWal = pSyncNode->pWal;
// truncate log with index, entries after the given index (>index) will be deleted
int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; }
pLogStore->appendEntry = logStoreAppendEntry;
pLogStore->getEntry = logStoreGetEntry;
pLogStore->truncate = logStoreTruncate;
pLogStore->getLastIndex = logStoreLastIndex;
pLogStore->getLastTerm = logStoreLastTerm;
pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
pLogStore->getCommitIndex = logStoreGetCommitIndex;
}
// return commit index of log
SyncIndex raftLogGetCommitIndex(struct SSyncLogStore* pLogStore) { return 0; }
void logStoreDestory(SSyncLogStore* pLogStore) {
if (pLogStore != NULL) {
free(pLogStore->data);
free(pLogStore);
}
}
// append one log entry
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
assert(pEntry->index == logStoreLastIndex(pLogStore) + 1);
uint32_t len;
char* serialized = syncEntrySerialize(pEntry, &len);
assert(serialized != NULL);
int code;
code = walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len);
assert(code == 0);
walFsync(pWal, true);
free(serialized);
}
// get one log entry, user need to free pEntry->pCont
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SSyncRaftEntry* pEntry;
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
walReadWithHandle(pWalHandle, index);
pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
assert(pEntry != NULL);
// need to hold, do not new every time!!
walCloseReadHandle(pWalHandle);
return pEntry;
}
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walRollback(pWal, fromIndex);
}
// return index of last entry
SyncIndex raftLogGetLastIndex(struct SSyncLogStore* pLogStore) { return 0; }
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastIndex = walGetLastVer(pWal);
return lastIndex;
}
// return term of last entry
SyncTerm raftLogGetLastTerm(struct SSyncLogStore* pLogStore) { return 0; }
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
SyncTerm lastTerm = 0;
SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
if (pLastEntry != NULL) {
lastTerm = pLastEntry->term;
free(pLastEntry);
}
return lastTerm;
}
// update log store commit index with "index"
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walCommit(pWal, index);
}
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
return pData->pSyncNode->commitIndex;
}
SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastIndex = walGetLastVer(pWal);
SSyncRaftEntry* pEntry = NULL;
if (lastIndex > 0) {
pEntry = logStoreGetEntry(pLogStore, lastIndex);
}
return pEntry;
}
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
char u64buf[128];
SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
cJSON* pRoot = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
cJSON* pEntries = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
SyncIndex lastIndex = logStoreLastIndex(pLogStore);
for (SyncIndex i = 0; i <= lastIndex; ++i) {
SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
syncEntryDestory(pEntry);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
return pJson;
}
char* logStore2Str(SSyncLogStore* pLogStore) {
cJSON* pJson = logStore2Json(pLogStore);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug
void logStorePrint(SSyncLogStore* pLogStore) {
char* s = logStore2Str(pLogStore);
// sTrace("%s", s);
fprintf(stderr, "logStorePrint: [len:%lu]| %s \n", strlen(s), s);
free(s);
}

View File

@ -14,7 +14,6 @@
*/
#include "syncSnapshot.h"
#include "syncRaft.h"
int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; }

View File

@ -148,4 +148,40 @@ const char* syncUtilState2String(ESyncState state) {
} else {
return "TAOS_SYNC_STATE_UNKNOWN";
}
}
bool syncUtilCanPrint(char c) {
if (c >= 32 && c <= 126) {
return true;
} else {
return false;
}
}
char* syncUtilprintBin(char* ptr, uint32_t len) {
char* s = malloc(len + 1);
assert(s != NULL);
memset(s, 0, len + 1);
memcpy(s, ptr, len);
for (int i = 0; i < len; ++i) {
if (!syncUtilCanPrint(s[i])) {
s[i] = '.';
}
}
return s;
}
char* syncUtilprintBin2(char* ptr, uint32_t len) {
uint32_t len2 = len * 4 + 1;
char* s = malloc(len2);
assert(s != NULL);
memset(s, 0, len2);
char* p = s;
for (int i = 0; i < len; ++i) {
int n = sprintf(p, "%d,", ptr[i]);
p += n;
}
return s;
}

View File

@ -15,6 +15,8 @@ add_executable(syncUtilTest "")
add_executable(syncVotesGrantedTest "")
add_executable(syncVotesRespondTest "")
add_executable(syncIndexMgrTest "")
add_executable(syncLogStoreTest "")
add_executable(syncEntryTest "")
target_sources(syncTest
@ -85,6 +87,14 @@ target_sources(syncIndexMgrTest
PRIVATE
"syncIndexMgrTest.cpp"
)
target_sources(syncLogStoreTest
PRIVATE
"syncLogStoreTest.cpp"
)
target_sources(syncEntryTest
PRIVATE
"syncEntryTest.cpp"
)
target_include_directories(syncTest
@ -172,6 +182,16 @@ target_include_directories(syncIndexMgrTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncLogStoreTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncEntryTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
@ -242,6 +262,14 @@ target_link_libraries(syncIndexMgrTest
sync
gtest_main
)
target_link_libraries(syncLogStoreTest
sync
gtest_main
)
target_link_libraries(syncEntryTest
sync
gtest_main
)
enable_testing()
@ -249,3 +277,5 @@ add_test(
NAME sync_test
COMMAND syncTest
)

View File

@ -2,6 +2,7 @@
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftStore.h"
void logTest() {

View File

@ -0,0 +1,81 @@
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
void test1() {
SSyncRaftEntry* pEntry = syncEntryBuild(10);
assert(pEntry != NULL);
pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100;
pEntry->index = 200;
strcpy(pEntry->data, "test1");
syncEntryPrint(pEntry);
syncEntryDestory(pEntry);
}
void test2() {
SyncClientRequest* pSyncMsg = syncClientRequestBuild(10);
pSyncMsg->originalRpcType = 33;
pSyncMsg->seqNum = 11;
pSyncMsg->isWeak = 1;
strcpy(pSyncMsg->data, "test2");
SSyncRaftEntry* pEntry = syncEntryBuild2(pSyncMsg, 100, 200);
syncEntryPrint(pEntry);
syncClientRequestDestroy(pSyncMsg);
syncEntryDestory(pEntry);
}
void test3() {
SSyncRaftEntry* pEntry = syncEntryBuild(10);
assert(pEntry != NULL);
pEntry->msgType = 11;
pEntry->originalRpcType = 22;
pEntry->seqNum = 33;
pEntry->isWeak = true;
pEntry->term = 44;
pEntry->index = 55;
strcpy(pEntry->data, "test3");
syncEntryPrint(pEntry);
uint32_t len;
char* serialized = syncEntrySerialize(pEntry, &len);
assert(serialized != NULL);
SSyncRaftEntry* pEntry2 = syncEntryDeserialize(serialized, len);
syncEntryPrint(pEntry2);
free(serialized);
syncEntryDestory(pEntry2);
syncEntryDestory(pEntry);
}
int main(int argc, char** argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
test1();
test2();
test3();
return 0;
}

View File

@ -3,6 +3,7 @@
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "ttime.h"
void logTest() {
sTrace("--- sync log test: trace");
@ -13,24 +14,13 @@ void logTest() {
sFatal("--- sync log test: fatal");
}
void doSync() {
SSyncInfo syncInfo;
syncInfo.vgId = 1;
void *pTimer = NULL;
void *pTimerMgr = NULL;
int g = 300;
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = 3;
pCfg->nodeInfo[0].nodePort = 7010;
taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg->nodeInfo[1].nodePort = 7110;
taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg->nodeInfo[2].nodePort = 7210;
taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
static void timerFp(void *param, void *tmrId) {
printf("param:%p, tmrId:%p, pTimer:%p, pTimerMgr:%p \n", param, tmrId, pTimer, pTimerMgr);
taosTmrReset(timerFp, 1000, param, pTimerMgr, &pTimer);
}
int main() {
@ -41,13 +31,12 @@ int main() {
logTest();
// ret = syncIOStart();
// assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
// doSync();
// timer
pTimerMgr = taosTmrInit(1000, 50, 10000, "SYNC-ENV-TEST");
taosTmrStart(timerFp, 1000, &g, pTimerMgr);
while (1) {
taosMsleep(1000);

View File

@ -33,8 +33,7 @@ SSyncNode* syncNodeInit() {
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path");
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path");
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;

View File

@ -30,8 +30,7 @@ SSyncNode* syncNodeInit() {
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path");
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path");
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
@ -54,6 +53,7 @@ SSyncNode* syncNodeInit() {
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
@ -89,11 +89,11 @@ int main(int argc, char** argv) {
SSyncNode* pSyncNode = syncInitTest();
assert(pSyncNode != NULL);
char* serialized = syncNode2Str(pSyncNode);
printf("%s\n", serialized);
free(serialized);
syncNodePrint((char*)"syncInitTest", pSyncNode);
initRaftId(pSyncNode);
//--------------------------------------------------------------
return 0;
}
}

View File

@ -0,0 +1,141 @@
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 1;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM* pFsm;
SWal* pWal;
SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
pWal = walOpen("./wal_test", &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
}
SSyncNode* syncInitTest() { return syncNodeInit(); }
void logStoreTest() {
logStorePrint(pSyncNode->pLogStore);
for (int i = 0; i < 5; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100;
pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
snprintf(pEntry->data, dataLen, "value%d", i);
//syncEntryPrint2((char*)"write entry:", pEntry);
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
syncEntryDestory(pEntry);
}
logStorePrint(pSyncNode->pLogStore);
pSyncNode->pLogStore->truncate(pSyncNode->pLogStore, 3);
logStorePrint(pSyncNode->pLogStore);
}
void initRaftId(SSyncNode* pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char* s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
int main(int argc, char** argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
pSyncNode = syncInitTest();
assert(pSyncNode != NULL);
//syncNodePrint((char*)"syncLogStoreTest", pSyncNode);
//initRaftId(pSyncNode);
logStoreTest();
return 0;
}

View File

@ -1,8 +1,10 @@
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
@ -13,59 +15,65 @@ void logTest() {
sFatal("--- sync log test: fatal");
}
uint16_t ports[3] = {7010, 7110, 7210};
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 3;
int32_t myIndex = 0;
SSyncNode* doSync(int myIndex) {
SSyncFSM* pFsm;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM* pFsm;
SSyncInfo syncInfo;
syncInfo.vgId = 1;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./path");
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./wal_path");
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = 3;
pCfg->replicaNum = replicaNum;
pCfg->nodeInfo[0].nodePort = ports[0];
snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg->nodeInfo[1].nodePort = ports[1];
snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg->nodeInfo[2].nodePort = ports[2];
snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
}
void timerPingAll(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
syncNodePingAll(pSyncNode);
SSyncNode* syncInitTest() { return syncNodeInit(); }
void initRaftId(SSyncNode* pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char* s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
int main(int argc, char** argv) {
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
int myIndex = 0;
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
@ -76,30 +84,45 @@ int main(int argc, char** argv) {
ret = syncEnvStart();
assert(ret == 0);
SSyncNode* pSyncNode = doSync(myIndex);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
SSyncNode* pSyncNode = syncInitTest();
assert(pSyncNode != NULL);
syncNodePrint((char*)"----1", pSyncNode);
initRaftId(pSyncNode);
//---------------------------
sTrace("syncNodeStartPingTimer ...");
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint((char*)"----2", pSyncNode);
sTrace("sleep ...");
taosMsleep(10000);
sTrace("syncNodeStopPingTimer ...");
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint((char*)"----3", pSyncNode);
taosMsleep(10000);
sTrace("sleep ...");
taosMsleep(5000);
sTrace("syncNodeStartPingTimer ...");
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint((char*)"----4", pSyncNode);
sTrace("sleep ...");
taosMsleep(10000);
sTrace("syncNodeStopPingTimer ...");
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint((char*)"----5", pSyncNode);
while (1) {
sTrace("while 1 sleep ...");
taosMsleep(1000);
}

View File

@ -32,8 +32,7 @@ SSyncNode* syncNodeInit() {
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path");
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path");
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;

View File

@ -32,8 +32,7 @@ SSyncNode* syncNodeInit() {
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path");
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path");
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;