diff --git a/source/libs/sync/CMakeLists.txt b/source/libs/sync/CMakeLists.txt index cb38d7e363..37ee5194c8 100644 --- a/source/libs/sync/CMakeLists.txt +++ b/source/libs/sync/CMakeLists.txt @@ -13,8 +13,4 @@ target_include_directories( sync PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) - -if(${BUILD_TEST}) - add_subdirectory(test) -endif(${BUILD_TEST}) +) \ No newline at end of file diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index b7c1c051cc..9ca0de19c5 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -23,7 +23,6 @@ extern "C" { #include #include #include -#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 22f8eb464f..8b5cbf1da5 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -23,7 +23,6 @@ extern "C" { #include #include #include -#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 7e9e637854..34dfdb3d09 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -23,7 +23,6 @@ extern "C" { #include #include #include -#include "syncInt.h" #include "taosdef.h" #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h deleted file mode 100644 index f1c4327b69..0000000000 --- a/source/libs/sync/inc/syncEnv.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_LIBS_SYNC_ENV_H -#define _TD_LIBS_SYNC_ENV_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include -#include "syncInt.h" -#include "taosdef.h" -#include "trpc.h" - -typedef struct SSyncEnv { - void *pTimer; - void *pTimerManager; -} SSyncEnv; - -int32_t syncEnvStart(); - -int32_t syncEnvStop(); - -static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv); - -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_LIBS_SYNC_ENV_H*/ diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h deleted file mode 100644 index 54a3d2b8c1..0000000000 --- a/source/libs/sync/inc/syncIO.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_LIBS_IO_H -#define _TD_LIBS_IO_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include -#include "os.h" -#include "syncInt.h" -#include "taosdef.h" -#include "tqueue.h" -#include "trpc.h" - -typedef struct SSyncIO { - void * serverRpc; - void * clientRpc; - STaosQueue *pMsgQ; - STaosQset * pQset; - pthread_t tid; - int8_t isStart; - - SEpSet epSet; - - void *syncTimer; - void *syncTimerManager; - - int32_t (*start)(struct SSyncIO *ths); - int32_t (*stop)(struct SSyncIO *ths); - int32_t (*ping)(struct SSyncIO *ths); - int32_t (*onMessage)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); - int32_t (*destroy)(struct SSyncIO *ths); - -} SSyncIO; - -int32_t syncIOStart(); -int32_t syncIOStop(); - -SSyncIO *syncIOCreate(); - -static int32_t doSyncIOStart(SSyncIO *io); -static int32_t doSyncIOStop(SSyncIO *io); -static int32_t doSyncIOPing(SSyncIO *io); -static int32_t doSyncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -static int32_t doSyncIODestroy(SSyncIO *io); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_LIBS_IO_H*/ diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e03835fc54..551ce83122 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -23,11 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" #include "taosdef.h" -#include "tlog.h" - -extern int32_t sDebugFlag; #define sFatal(...) \ { \ @@ -76,12 +72,6 @@ typedef struct SSyncNode { int64_t rid; } SSyncNode; -SSyncNode* syncNodeStart(const SSyncInfo* pSyncInfo); -void syncNodeStop(SSyncNode* pSyncNode); - -// int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); -int32_t syncNodeForwardToPeer(SSyncNode* pSyncNode, const SSyncBuffer* pBuf, bool isWeak); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 41a19eb49a..dc74526c73 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "syncInt.h" +#include "sync.h" #include "syncRaftEntry.h" #include "taosdef.h" @@ -41,26 +41,28 @@ typedef enum ESyncMessageType { typedef struct SyncPing { ESyncMessageType msgType; const SSyncBuffer *pData; -} SyncPing; +} SyncPing, RaftPing; + typedef struct SyncPingReply { ESyncMessageType msgType; const SSyncBuffer *pData; -} SyncPingReply; +} SyncPingReply, RaftPingReply; typedef struct SyncClientRequest { ESyncMessageType msgType; const SSyncBuffer *pData; int64_t seqNum; bool isWeak; -} SyncClientRequest; +} SyncClientRequest, RaftClientRequest; typedef struct SyncClientRequestReply { ESyncMessageType msgType; int32_t errCode; const SSyncBuffer *pErrMsg; const SSyncBuffer *pLeaderHint; -} SyncClientRequestReply; +} SyncClientRequestReply, RaftClientRequestReply; + typedef struct SyncRequestVote { ESyncMessageType msgType; @@ -69,7 +71,7 @@ typedef struct SyncRequestVote { SyncGroupId vgId; SyncIndex lastLogIndex; SyncTerm lastLogTerm; -} SyncRequestVote; +} SyncRequestVote, RaftRequestVote; typedef struct SyncRequestVoteReply { ESyncMessageType msgType; @@ -77,7 +79,7 @@ typedef struct SyncRequestVoteReply { SyncNodeId nodeId; SyncGroupId vgId; bool voteGranted; -} SyncRequestVoteReply; +} SyncRequestVoteReply, RaftRequestVoteReply; typedef struct SyncAppendEntries { ESyncMessageType msgType; @@ -88,7 +90,7 @@ typedef struct SyncAppendEntries { int32_t entryCount; SSyncRaftEntry * logEntries; SyncIndex commitIndex; -} SyncAppendEntries; +} SyncAppendEntries, RaftAppendEntries; typedef struct SyncAppendEntriesReply { ESyncMessageType msgType; @@ -96,7 +98,7 @@ typedef struct SyncAppendEntriesReply { SyncNodeId nodeId; bool success; SyncIndex matchIndex; -} SyncAppendEntriesReply; +} SyncAppendEntriesReply, RaftAppendEntriesReply; #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaft.h b/source/libs/sync/inc/syncRaft.h index 5852c0ec30..6a3140d930 100644 --- a/source/libs/sync/inc/syncRaft.h +++ b/source/libs/sync/inc/syncRaft.h @@ -23,24 +23,59 @@ extern "C" { #include #include #include -#include "syncInt.h" +#include "sync.h" #include "syncMessage.h" #include "taosdef.h" typedef struct SRaftId { - SyncNodeId addr; + SyncNodeId nodeId; SyncGroupId vgId; } SRaftId; typedef struct SRaft { SRaftId id; + void* data; - SSyncLogStore* logStore; - SStateMgr* stateManager; - SSyncFSM* syncFsm; + int32_t (*FpPing)(struct SRaft* ths, const RaftPing* pMsg); + + int32_t (*FpOnPing)(struct SRaft* ths, RaftPing* pMsg); + + int32_t (*FpOnPingReply)(struct SRaft* ths, RaftPingReply* pMsg); + + int32_t (*FpRequestVote)(struct SRaft* ths, const RaftRequestVote* pMsg); + + int32_t (*FpOnRequestVote)(struct SRaft* ths, RaftRequestVote* pMsg); + + int32_t (*FpOnRequestVoteReply)(struct SRaft* ths, RaftRequestVoteReply* pMsg); + + int32_t (*FpAppendEntries)(struct SRaft* ths, const RaftAppendEntries* pMsg); + + int32_t (*FpOnAppendEntries)(struct SRaft* ths, RaftAppendEntries* pMsg); + + int32_t (*FpOnAppendEntriesReply)(struct SRaft* ths, RaftAppendEntriesReply* pMsg); } SRaft; +SRaft* raftCreate(SRaftId raftId, void* data); + +static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg); + +static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg); + +static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg); + +static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg); + +static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg); + +static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg); + +static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg); + +static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg); + +static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg); + int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft); diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 516bef4d48..adc82f2c5d 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_LIBS_SYNC_RAFT_ENTRY_H -#define _TD_LIBS_SYNC_RAFT_ENTRY_H +#ifndef _TD_LIBS_TPL_H +#define _TD_LIBS_TPL_H #ifdef __cplusplus extern "C" { @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "syncInt.h" +#include "sync.h" #include "taosdef.h" typedef struct SSyncRaftEntry { @@ -37,4 +37,4 @@ typedef struct SSyncRaftEntry { } #endif -#endif /*_TD_LIBS_SYNC_RAFT_ENTRY_H*/ +#endif /*_TD_LIBS_TPL_H*/ diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index ee971062cf..8c4b5116ea 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "syncInt.h" +#include "sync.h" #include "taosdef.h" int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index bdaeb81aee..4cb852f34a 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -23,36 +23,17 @@ extern "C" { #include #include #include -#include "cJSON.h" -#include "syncInt.h" +#include "sync.h" #include "syncRaft.h" #include "taosdef.h" -#define RAFT_STORE_BLOCK_SIZE 512 -#define RAFT_STORE_PATH_LEN 128 +int32_t currentTerm(SyncTerm *pCurrentTerm); -typedef struct SRaftStore { - SyncTerm currentTerm; - SRaftId voteFor; - FileFd fd; - char path[RAFT_STORE_PATH_LEN]; -} SRaftStore; +int32_t persistCurrentTerm(SyncTerm currentTerm); -SRaftStore *raftStoreOpen(const char *path); +int32_t voteFor(SRaftId *pRaftId); -static int32_t raftStoreInit(SRaftStore *pRaftStore); - -int32_t raftStoreClose(SRaftStore *pRaftStore); - -int32_t raftStorePersist(SRaftStore *pRaftStore); - -static bool raftStoreFileExist(char *path); - -int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); - -int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); - -void raftStorePrint(SRaftStore *pRaftStore); +int32_t persistVoteFor(SRaftId *pRaftId); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 7f97ae9e49..40c5ff790b 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -23,7 +23,6 @@ extern "C" { #include #include #include -#include "syncInt.h" #include "taosdef.h" #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index c2eca55151..3ff96bbe8f 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -23,7 +23,6 @@ extern "C" { #include #include #include -#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 38068dd0e2..033ac89bc2 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -23,7 +23,6 @@ extern "C" { #include #include #include -#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 89fcb230fb..3b6121578a 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "syncInt.h" +#include "sync.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index d9d6a17939..8159d2566c 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -23,7 +23,6 @@ extern "C" { #include #include #include -#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h deleted file mode 100644 index cfcf58bee2..0000000000 --- a/source/libs/sync/inc/syncVoteMgr.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_LIBS_SYNC_VOTG_MGR_H -#define _TD_LIBS_SYNC_VOTG_MGR_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include -#include "syncInt.h" -#include "taosdef.h" - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_LIBS_SYNC_VOTG_MGR_H*/ diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 2b9c59ec92..65654564ab 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -14,6 +14,7 @@ */ #include "syncAppendEntries.h" +#include "sync.h" void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { // TLA+ Spec diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 05734237b9..20235ef720 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -14,6 +14,7 @@ */ #include "syncAppendEntriesReply.h" +#include "sync.h" void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { // TLA+ Spec diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 329105e2a1..738fc4c5e1 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "syncElection.h" +#include "sync.h" diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c deleted file mode 100644 index e71cf55cb1..0000000000 --- a/source/libs/sync/src/syncEnv.c +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "syncEnv.h" -#include - -SSyncEnv *gSyncEnv = NULL; - -int32_t syncEnvStart() { - int32_t ret; - gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv)); - assert(gSyncEnv != NULL); - ret = doSyncEnvStart(gSyncEnv); - return ret; -} - -int32_t syncEnvStop() { - int32_t ret = doSyncEnvStop(gSyncEnv); - return ret; -} - -static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { return 0; } - -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { return 0; } diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c deleted file mode 100644 index 0e32d9ac50..0000000000 --- a/source/libs/sync/src/syncIO.c +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "syncIO.h" -#include -#include "syncOnMessage.h" -#include "tglobal.h" -#include "ttimer.h" -#include "tutil.h" - -int32_t syncIOStart() { return 0; } - -int32_t syncIOStop() { return 0; } - -static void syncTick(void *param, void *tmrId) { - SSyncIO *io = (SSyncIO *)param; - sDebug("syncTick ... "); - - SRpcMsg rpcMsg; - rpcMsg.pCont = rpcMallocCont(10); - snprintf(rpcMsg.pCont, 10, "TICK"); - rpcMsg.contLen = 10; - rpcMsg.handle = NULL; - rpcMsg.msgType = 2; - - SRpcMsg *pTemp; - - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); - memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); - - taosWriteQitem(io->pMsgQ, pTemp); - - io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); -} - -void *syncConsumer(void *param) { - SSyncIO *io = param; - - STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; - int type; - - qall = taosAllocateQall(); - - while (1) { - int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); - sDebug("%d sync-io msgs are received", numOfMsgs); - if (numOfMsgs <= 0) break; - - for (int i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, (void **)&pRpcMsg); - sDebug("sync-io recv type:%d msg:%s", pRpcMsg->msgType, (char *)(pRpcMsg->pCont)); - } - - taosResetQitems(qall); - for (int i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, (void **)&pRpcMsg); - rpcFreeCont(pRpcMsg->pCont); - - if (pRpcMsg->handle != NULL) { - int msgSize = 128; - memset(&rpcMsg, 0, sizeof(rpcMsg)); - rpcMsg.pCont = rpcMallocCont(msgSize); - rpcMsg.contLen = msgSize; - rpcMsg.handle = pRpcMsg->handle; - rpcMsg.code = 0; - rpcSendResponse(&rpcMsg); - } - - taosFreeQitem(pRpcMsg); - } - } - - taosFreeQall(qall); - return NULL; -} - -static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { - // app shall retrieve the auth info based on meterID from DB or a data file - // demo code here only for simple demo - int ret = 0; - return ret; -} - -static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - sDebug("processResponse ... "); - rpcFreeCont(pMsg->pCont); -} - -static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - SSyncIO *io = pParent; - SRpcMsg *pTemp; - - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); - memcpy(pTemp, pMsg, sizeof(SRpcMsg)); - - sDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); - taosWriteQitem(io->pMsgQ, pTemp); -} - -SSyncIO *syncIOCreate() { - SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); - memset(io, 0, sizeof(*io)); - - io->pMsgQ = taosOpenQueue(); - io->pQset = taosOpenQset(); - taosAddIntoQset(io->pQset, io->pMsgQ, NULL); - - io->start = doSyncIOStart; - io->stop = doSyncIOStop; - io->ping = doSyncIOPing; - io->onMessage = doSyncIOOnMessage; - io->destroy = doSyncIODestroy; - - return io; -} - -static int32_t doSyncIOStart(SSyncIO *io) { - taosBlockSIGPIPE(); - - tsRpcForceTcp = 1; - - // cient rpc init - { - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 0; - rpcInit.label = "SYNC-IO-CLIENT"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = processResponse; - rpcInit.sessions = 100; - rpcInit.idleTime = 100; - rpcInit.user = "sync-io"; - rpcInit.secret = "sync-io"; - rpcInit.ckey = "key"; - rpcInit.spi = 0; - rpcInit.connType = TAOS_CONN_CLIENT; - - io->clientRpc = rpcOpen(&rpcInit); - if (io->clientRpc == NULL) { - sError("failed to initialize RPC"); - return -1; - } - } - - // server rpc init - { - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 38000; - rpcInit.label = "SYNC-IO-SERVER"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = processRequestMsg; - rpcInit.sessions = 1000; - rpcInit.idleTime = 2 * 1500; - rpcInit.afp = retrieveAuthInfo; - rpcInit.parent = io; - rpcInit.connType = TAOS_CONN_SERVER; - - void *pRpc = rpcOpen(&rpcInit); - if (pRpc == NULL) { - sError("failed to start RPC server"); - return -1; - } - } - - io->epSet.inUse = 0; - addEpIntoEpSet(&io->epSet, "127.0.0.1", 38000); - - // start consumer thread - { - if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) { - sError("failed to create sync consumer thread since %s", strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - } - - // start tmr thread - io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); - io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); - - return 0; -} - -static int32_t doSyncIOStop(SSyncIO *io) { - atomic_store_8(&io->isStart, 0); - pthread_join(io->tid, NULL); - return 0; -} - -static int32_t doSyncIOPing(SSyncIO *io) { - SRpcMsg rpcMsg, rspMsg; - - rpcMsg.pCont = rpcMallocCont(10); - snprintf(rpcMsg.pCont, 10, "ping"); - rpcMsg.contLen = 10; - rpcMsg.handle = NULL; - rpcMsg.msgType = 1; - - rpcSendRequest(io->clientRpc, &io->epSet, &rpcMsg, NULL); - - return 0; -} - -static int32_t doSyncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } - -static int32_t doSyncIODestroy(SSyncIO *io) { - int8_t start = atomic_load_8(&io->isStart); - assert(start == 0); - - if (io->serverRpc != NULL) { - free(io->serverRpc); - io->serverRpc = NULL; - } - - if (io->clientRpc != NULL) { - free(io->clientRpc); - io->clientRpc = NULL; - } - - if (io->pMsgQ != NULL) { - free(io->pMsgQ); - io->pMsgQ = NULL; - } - - if (io->pQset != NULL) { - free(io->pQset); - io->pQset = NULL; - } - - return 0; -} \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0e7d83d39a..fbb969eb1c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -14,13 +14,10 @@ */ #include -#include "syncEnv.h" +#include "sync.h" #include "syncInt.h" -int32_t syncInit() { - int32_t ret = syncEnvStart(); - return ret; -} +int32_t syncInit() { return 0; } void syncCleanUp() {} @@ -34,9 +31,4 @@ int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { r ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } -void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} - -SSyncNode* syncNodeStart(const SSyncInfo* pSyncInfo) { return NULL; } -void syncNodeStop(SSyncNode* pSyncNode) {} - -int32_t syncNodeForwardToPeer(SSyncNode* pSyncNode, const SSyncBuffer* pBuf, bool isWeak) { return 0; } \ No newline at end of file +void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 8937303725..dcfc940f76 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -14,6 +14,7 @@ */ #include "syncMessage.h" +#include "sync.h" #include "syncRaft.h" void onMessage(SRaft *pRaft, void *pMsg) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncOnMessage.c b/source/libs/sync/src/syncOnMessage.c index 19a97ee156..738fc4c5e1 100644 --- a/source/libs/sync/src/syncOnMessage.c +++ b/source/libs/sync/src/syncOnMessage.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "syncOnMessage.h" +#include "sync.h" diff --git a/source/libs/sync/src/syncRaft.c b/source/libs/sync/src/syncRaft.c index f0a29917e0..1a01aefb0b 100644 --- a/source/libs/sync/src/syncRaft.c +++ b/source/libs/sync/src/syncRaft.c @@ -14,6 +14,47 @@ */ #include "syncRaft.h" +#include "sync.h" + +SRaft* raftCreate(SRaftId raftId, void* data) { + SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft)); + assert(pRaft != NULL); + + pRaft->id = raftId; + pRaft->data = data; + + pRaft->FpPing = doRaftPing; + pRaft->FpOnPing = onRaftPing; + pRaft->FpOnPingReply = onRaftPingReply; + + pRaft->FpRequestVote = doRaftRequestVote; + pRaft->FpOnRequestVote = onRaftRequestVote; + pRaft->FpOnRequestVoteReply = onRaftRequestVoteReply; + + pRaft->FpAppendEntries = doRaftAppendEntries; + pRaft->FpOnAppendEntries = onRaftAppendEntries; + pRaft->FpOnAppendEntriesReply = onRaftAppendEntriesReply; + + return pRaft; +} + +static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg) { return 0; } + +static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg) { return 0; } + +static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg) { return 0; } + +static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg) { return 0; } + +static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg) { return 0; } + +static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg) { return 0; } + +static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg) { return 0; } + +static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg) { return 0; } + +static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg) { return 0; } int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index e525d3c7c2..738fc4c5e1 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "syncRaftEntry.h" +#include "sync.h" diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 37bb3ce48c..4a5fc201b0 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -14,6 +14,7 @@ */ #include "syncRaftLog.h" +#include "sync.h" int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; } diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 4391f5d25c..d45e53132c 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -14,118 +14,12 @@ */ #include "syncRaftStore.h" -#include "cJSON.h" +#include "sync.h" -SRaftStore *raftStoreOpen(const char *path) { - int32_t ret; +int32_t currentTerm(SyncTerm *pCurrentTerm) { return 0; } - SRaftStore *pRaftStore = malloc(sizeof(SRaftStore)); - if (pRaftStore == NULL) { - sError("raftStoreOpen malloc error"); - return NULL; - } - memset(pRaftStore, 0, sizeof(*pRaftStore)); - snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path); +int32_t persistCurrentTerm(SyncTerm currentTerm) { return 0; } - char storeBuf[RAFT_STORE_BLOCK_SIZE]; - memset(storeBuf, 0, sizeof(storeBuf)); +int32_t voteFor(SRaftId *pRaftId) { return 0; } - if (!raftStoreFileExist(pRaftStore->path)) { - ret = raftStoreInit(pRaftStore); - assert(ret == 0); - } - - pRaftStore->fd = taosOpenFileReadWrite(pRaftStore->path); - if (pRaftStore->fd < 0) { - return NULL; - } - - int len = taosReadFile(pRaftStore->fd, storeBuf, sizeof(storeBuf)); - assert(len == RAFT_STORE_BLOCK_SIZE); - - ret = raftStoreDeserialize(pRaftStore, storeBuf, len); - assert(ret == 0); - - return pRaftStore; -} - -static int32_t raftStoreInit(SRaftStore *pRaftStore) { - pRaftStore->fd = taosOpenFileCreateWrite(pRaftStore->path); - if (pRaftStore->fd < 0) { - return -1; - } - - pRaftStore->currentTerm = 0; - pRaftStore->voteFor.addr = 0; - pRaftStore->voteFor.vgId = 0; - - int32_t ret = raftStorePersist(pRaftStore); - assert(ret == 0); - - taosCloseFile(pRaftStore->fd); - return 0; -} - -int32_t raftStoreClose(SRaftStore *pRaftStore) { - taosCloseFile(pRaftStore->fd); - free(pRaftStore); - return 0; -} - -int32_t raftStorePersist(SRaftStore *pRaftStore) { - int32_t ret; - char storeBuf[RAFT_STORE_BLOCK_SIZE]; - - ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); - assert(ret == 0); - - taosLSeekFile(pRaftStore->fd, 0, SEEK_SET); - - ret = taosWriteFile(pRaftStore->fd, storeBuf, sizeof(storeBuf)); - assert(ret == RAFT_STORE_BLOCK_SIZE); - - fsync(pRaftStore->fd); - return 0; -} - -static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; } - -int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { - cJSON *pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm); - cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr); - cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); - - char *serialized = cJSON_Print(pRoot); - int len2 = strlen(serialized); - assert(len2 < len); - memset(buf, 0, len); - snprintf(buf, len, "%s", serialized); - free(serialized); - - cJSON_Delete(pRoot); - return 0; -} - -int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { - assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); - cJSON *pRoot = cJSON_Parse(buf); - - cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); - pRaftStore->currentTerm = pCurrentTerm->valueint; - - cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); - pRaftStore->voteFor.addr = pVoteForAddr->valueint; - - cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); - pRaftStore->voteFor.vgId = pVoteForVgid->valueint; - - cJSON_Delete(pRoot); - return 0; -} - -void raftStorePrint(SRaftStore *pRaftStore) { - char storeBuf[RAFT_STORE_BLOCK_SIZE]; - raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); - printf("%s\n", storeBuf); -} +int32_t persistVoteFor(SRaftId *pRaftId) { return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 4cea7c150e..738fc4c5e1 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "syncReplication.h" +#include "sync.h" diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 7aee47b8e4..88056c95ff 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -14,6 +14,7 @@ */ #include "syncRequestVote.h" +#include "sync.h" void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { // TLA+ Spec diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index a9c88a7975..4ca1b1343f 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -14,6 +14,7 @@ */ #include "syncRequestVoteReply.h" +#include "sync.h" void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { // TLA+ Spec diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index da194780ff..8a27f097d1 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -14,6 +14,7 @@ */ #include "syncSnapshot.h" +#include "sync.h" #include "syncRaft.h" int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index e27df55d07..206dd70046 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -14,5 +14,6 @@ */ #include "syncTimeout.h" +#include "sync.h" void onTimeout(SRaft *pRaft, void *pMsg) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c deleted file mode 100644 index 02cf4ac033..0000000000 --- a/source/libs/sync/src/syncVoteMgr.c +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "syncVoteMgr.h" diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt deleted file mode 100644 index e655ac01be..0000000000 --- a/source/libs/sync/test/CMakeLists.txt +++ /dev/null @@ -1,55 +0,0 @@ -add_executable(syncTest "") -add_executable(syncEnvTest "") -add_executable(syncPingTest "") - - -target_sources(syncTest - PRIVATE - "syncTest.cpp" -) -target_sources(syncEnvTest - PRIVATE - "syncEnvTest.cpp" -) -target_sources(syncPingTest - PRIVATE - "syncPingTest.cpp" -) - - -target_include_directories(syncTest - PUBLIC - "${CMAKE_SOURCE_DIR}/include/libs/sync" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) -target_include_directories(syncEnvTest - PUBLIC - "${CMAKE_SOURCE_DIR}/include/libs/sync" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) -target_include_directories(syncPingTest - PUBLIC - "${CMAKE_SOURCE_DIR}/include/libs/sync" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) - - -target_link_libraries(syncTest - sync - gtest_main -) -target_link_libraries(syncEnvTest - sync - gtest_main -) -target_link_libraries(syncPingTest - sync - gtest_main -) - - -enable_testing() -add_test( - NAME sync_test - COMMAND syncTest -) diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp deleted file mode 100644 index 31dad593e6..0000000000 --- a/source/libs/sync/test/syncEnvTest.cpp +++ /dev/null @@ -1,56 +0,0 @@ -#include "syncEnv.h" -#include -#include "syncIO.h" -#include "syncInt.h" -#include "syncRaftStore.h" - -void logTest() { - sTrace("--- sync log test: trace"); - sDebug("--- sync log test: debug"); - sInfo("--- sync log test: info"); - sWarn("--- sync log test: warn"); - sError("--- sync log test: error"); - sFatal("--- sync log test: fatal"); -} - -void doSync() { - SSyncInfo syncInfo; - syncInfo.vgId = 1; - - SSyncCfg* pCfg = &syncInfo.syncCfg; - pCfg->replicaNum = 3; - - pCfg->nodeInfo[0].nodePort = 7010; - taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - - pCfg->nodeInfo[1].nodePort = 7110; - taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); - - pCfg->nodeInfo[2].nodePort = 7210; - taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); - - SSyncNode* pSyncNode = syncNodeStart(&syncInfo); - assert(pSyncNode != NULL); -} - -int main() { - taosInitLog((char*)"syncEnvTest.log", 100000, 10); - tsAsyncLog = 0; - sDebugFlag = 143 + 64; - - logTest(); - - int32_t ret = syncIOStart(); - assert(ret == 0); - - ret = syncEnvStart(); - assert(ret == 0); - - doSync(); - - while (1) { - taosMsleep(1000); - } - - return 0; -} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp deleted file mode 100644 index 5ed72fd56b..0000000000 --- a/source/libs/sync/test/syncPingTest.cpp +++ /dev/null @@ -1,57 +0,0 @@ -#include -#include "syncEnv.h" -#include "syncIO.h" -#include "syncInt.h" -#include "syncRaftStore.h" - -void logTest() { - sTrace("--- sync log test: trace"); - sDebug("--- sync log test: debug"); - sInfo("--- sync log test: info"); - sWarn("--- sync log test: warn"); - sError("--- sync log test: error"); - sFatal("--- sync log test: fatal"); -} - -void doSync() { - SSyncInfo syncInfo; - syncInfo.vgId = 1; - - SSyncCfg* pCfg = &syncInfo.syncCfg; - pCfg->myIndex = 0; - pCfg->replicaNum = 3; - - pCfg->nodeInfo[0].nodePort = 7010; - taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - - pCfg->nodeInfo[1].nodePort = 7110; - taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); - - pCfg->nodeInfo[2].nodePort = 7210; - taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); - - SSyncNode* pSyncNode = syncNodeStart(&syncInfo); - assert(pSyncNode != NULL); -} - -int main() { - taosInitLog((char*)"syncPingTest.log", 100000, 10); - tsAsyncLog = 0; - sDebugFlag = 143 + 64; - - logTest(); - - int32_t ret = syncIOStart(); - assert(ret == 0); - - ret = syncEnvStart(); - assert(ret == 0); - - doSync(); - - while (1) { - taosMsleep(1000); - } - - return 0; -} diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 12b0905fa0..47566d537e 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -1,57 +1,7 @@ #include -#include "syncIO.h" -#include "syncInt.h" -#include "syncRaftStore.h" - -void *pingFunc(void *param) { - SSyncIO *io = (SSyncIO *)param; - while (1) { - sDebug("io->ping"); - io->ping(io); - sleep(1); - } - return NULL; -} int main() { - taosInitLog((char *)"syncTest.log", 100000, 10); - tsAsyncLog = 0; - sDebugFlag = 143 + 64; - - sTrace("sync log test: trace"); - sDebug("sync log test: debug"); - sInfo("sync log test: info"); - sWarn("sync log test: warn"); - sError("sync log test: error"); - sFatal("sync log test: fatal"); - - SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); - assert(pRaftStore != NULL); - - raftStorePrint(pRaftStore); - - pRaftStore->currentTerm = 100; - pRaftStore->voteFor.addr = 200; - pRaftStore->voteFor.vgId = 300; - - raftStorePrint(pRaftStore); - - raftStorePersist(pRaftStore); - - sDebug("sync test"); - - SSyncIO *syncIO = syncIOCreate(); - assert(syncIO != NULL); - - syncIO->start(syncIO); - - sleep(2); - - pthread_t tid; - pthread_create(&tid, NULL, pingFunc, syncIO); - - while (1) { - sleep(1); - } - return 0; + printf("test \n"); + return 0; } +