From b63995287d7a2848ce92aaf109ab90b1c5e60119 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 25 Feb 2022 17:55:20 +0800 Subject: [PATCH] add raft store --- include/libs/sync/sync.h | 20 ++++----- source/libs/sync/inc/syncElection.h | 2 +- source/libs/sync/inc/syncRaft.h | 6 +-- source/libs/sync/inc/syncReplication.h | 2 +- source/libs/sync/inc/syncTimeout.h | 2 +- source/libs/sync/inc/syncVoteMgr.h | 5 +-- source/libs/sync/test/CMakeLists.txt | 14 +++++++ source/libs/sync/test/syncEnvTest.cpp | 30 ++++++++++---- source/libs/sync/test/syncPingTest.cpp | 57 ++++++++++++++++++++++++++ 9 files changed, 112 insertions(+), 26 deletions(-) create mode 100644 source/libs/sync/test/syncPingTest.cpp diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index f5881837c6..03ec7c0eac 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -34,23 +34,23 @@ typedef enum { TAOS_SYNC_STATE_LEADER = 2, } ESyncState; -typedef struct { +typedef struct SSyncBuffer { void* data; size_t len; } SSyncBuffer; -typedef struct { - SyncNodeId nodeId; - uint16_t nodePort; // node sync Port - char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN +typedef struct SNodeInfo { + uint16_t nodePort; // node sync Port + char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN } SNodeInfo; -typedef struct { +typedef struct SSyncCfg { int32_t replicaNum; + int32_t myIndex; SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; } SSyncCfg; -typedef struct { +typedef struct SNodesRole { int32_t replicaNum; SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; ESyncState role[TSDB_MAX_REPLICA]; @@ -128,9 +128,9 @@ typedef struct SStateMgr { } SStateMgr; -typedef struct { - SyncGroupId vgId; - SSyncCfg syncCfg; +typedef struct SSyncInfo { + SyncGroupId vgId; + SSyncCfg syncCfg; } SSyncInfo; struct SSyncNode; diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 0c86e9a6d3..7e9e637854 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -23,8 +23,8 @@ extern "C" { #include #include #include -#include "taosdef.h" #include "syncInt.h" +#include "taosdef.h" #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaft.h b/source/libs/sync/inc/syncRaft.h index 3964b95f28..5852c0ec30 100644 --- a/source/libs/sync/inc/syncRaft.h +++ b/source/libs/sync/inc/syncRaft.h @@ -35,9 +35,9 @@ typedef struct SRaftId { typedef struct SRaft { SRaftId id; - SSyncLogStore *logStore; - SStateMgr *stateManager; - SSyncFSM *syncFsm; + SSyncLogStore* logStore; + SStateMgr* stateManager; + SSyncFSM* syncFsm; } SRaft; diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index bfe071853c..7f97ae9e49 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -23,8 +23,8 @@ extern "C" { #include #include #include -#include "taosdef.h" #include "syncInt.h" +#include "taosdef.h" #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index 2f5517069e..d9d6a17939 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -23,10 +23,10 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" -#include "syncInt.h" void onTimeout(SRaft *pRaft, void *pMsg); diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index 93729a859c..cfcf58bee2 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -20,12 +20,11 @@ extern "C" { #endif +#include #include #include -#include -#include "taosdef.h" #include "syncInt.h" - +#include "taosdef.h" #ifdef __cplusplus } diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 17405989f5..e655ac01be 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -1,5 +1,6 @@ add_executable(syncTest "") add_executable(syncEnvTest "") +add_executable(syncPingTest "") target_sources(syncTest @@ -10,6 +11,10 @@ target_sources(syncEnvTest PRIVATE "syncEnvTest.cpp" ) +target_sources(syncPingTest + PRIVATE + "syncPingTest.cpp" +) target_include_directories(syncTest @@ -22,6 +27,11 @@ target_include_directories(syncEnvTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncPingTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -32,6 +42,10 @@ target_link_libraries(syncEnvTest sync gtest_main ) +target_link_libraries(syncPingTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index c682284b25..31dad593e6 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -13,6 +13,26 @@ void logTest() { sFatal("--- sync log test: fatal"); } +void doSync() { + SSyncInfo syncInfo; + syncInfo.vgId = 1; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = 7010; + taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = 7110; + taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = 7210; + taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeStart(&syncInfo); + assert(pSyncNode != NULL); +} + int main() { taosInitLog((char*)"syncEnvTest.log", 100000, 10); tsAsyncLog = 0; @@ -20,17 +40,13 @@ int main() { logTest(); - int32_t ret = syncEnvStart(); + int32_t ret = syncIOStart(); assert(ret == 0); - ret = syncIOStart(); + ret = syncEnvStart(); assert(ret == 0); - SSyncInfo syncInfo; - syncInfo.vgId = 1; - - SSyncNode* pSyncNode = syncNodeStart(&syncInfo); - assert(pSyncNode != NULL); + doSync(); while (1) { taosMsleep(1000); diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp new file mode 100644 index 0000000000..5ed72fd56b --- /dev/null +++ b/source/libs/sync/test/syncPingTest.cpp @@ -0,0 +1,57 @@ +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +void doSync() { + SSyncInfo syncInfo; + syncInfo.vgId = 1; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = 0; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = 7010; + taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = 7110; + taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = 7210; + taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeStart(&syncInfo); + assert(pSyncNode != NULL); +} + +int main() { + taosInitLog((char*)"syncPingTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret = syncIOStart(); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + doSync(); + + while (1) { + taosMsleep(1000); + } + + return 0; +}