diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index fddf18c571..ccbeb00bfd 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -31,9 +31,9 @@ typedef int64_t SyncIndex; typedef uint64_t SyncTerm; typedef enum { - TAOS_SYNC_STATE_FOLLOWER = 0, - TAOS_SYNC_STATE_CANDIDATE = 1, - TAOS_SYNC_STATE_LEADER = 2, + TAOS_SYNC_STATE_FOLLOWER = 100, + TAOS_SYNC_STATE_CANDIDATE = 101, + TAOS_SYNC_STATE_LEADER = 102, } ESyncState; typedef struct SSyncBuffer { @@ -134,6 +134,7 @@ typedef struct SSyncInfo { SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; + char walPath[TSDB_FILENAME_LEN]; SSyncFSM* pFsm; void* rpcClient; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 79412febd9..fdacb87481 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "cJSON.h" #include "sync.h" #include "taosdef.h" #include "tglobal.h" @@ -189,6 +190,7 @@ typedef struct SSyncNode { SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeClose(SSyncNode* pSyncNode); +cJSON* syncNode2Json(const SSyncNode* pSyncNode); int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index eb1e888254..6cb1105b2c 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -42,9 +42,12 @@ void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest); void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); // ---- misc ---- -int32_t syncUtilRand(int32_t max); -int32_t syncUtilElectRandomMS(); -int32_t syncUtilQuorum(int32_t replicaNum); +int32_t syncUtilRand(int32_t max); +int32_t syncUtilElectRandomMS(); +int32_t syncUtilQuorum(int32_t replicaNum); +cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p); +cJSON* syncUtilRaftId2Json(const SRaftId* p); +const char* syncUtilState2String(ESyncState state); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 836f1b0e83..aebbd4d337 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -170,6 +170,71 @@ void syncNodeClose(SSyncNode* pSyncNode) { free(pSyncNode); } +cJSON* syncNode2Json(const SSyncNode* pSyncNode) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); + + // init by SSyncInfo + cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId); + cJSON_AddStringToObject(pRoot, "path", pSyncNode->path); + cJSON_AddStringToObject(pRoot, "walPath", pSyncNode->walPath); + + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient); + cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg); + cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->queue); + cJSON_AddStringToObject(pRoot, "queue", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg); + cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); + + // init internal + cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->me); + cJSON_AddItemToObject(pRoot, "me", pMe); + cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->raftId); + cJSON_AddItemToObject(pRoot, "raftId", pRaftId); + + cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum); + cJSON* pPeers = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "peers", pPeers); + for (int i = 0; i < pSyncNode->peersNum; ++i) { + cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peers[i])); + } + cJSON* pPeersId = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "peersId", pPeersId); + for (int i = 0; i < pSyncNode->peersNum; ++i) { + cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i])); + } + + cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum); + cJSON* pReplicasId = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId); + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i])); + } + + // raft algorithm + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm); + cJSON_AddStringToObject(pRoot, "pFsm", u64buf); + cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum); + cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache); + cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache); + + // tla+ server vars + cJSON_AddStringToObject(pRoot, "state", syncUtilState2String(pSyncNode->state)); + + // tla+ candidate vars + + // tla+ leader vars + + // tla+ log vars + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncNode", pRoot); + return pJson; +} + int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 1e62301814..04be3f33ac 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -99,4 +99,46 @@ int32_t syncUtilRand(int32_t max) { return rand() % max; } int32_t syncUtilElectRandomMS() { ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); } -int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; } \ No newline at end of file +int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; } + +cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); + + cJSON_AddStringToObject(pRoot, "nodeFqdn", p->nodeFqdn); + cJSON_AddNumberToObject(pRoot, "nodePort", p->nodePort); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot); + return pJson; +} + +cJSON* syncUtilRaftId2Json(const SRaftId* p) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); + + snprintf(u64buf, sizeof(u64buf), "%lu", p->addr); + cJSON_AddStringToObject(pRoot, "addr", u64buf); + char host[128]; + uint16_t port; + syncUtilU642Addr(p->addr, host, sizeof(host), &port); + cJSON_AddStringToObject(pRoot, "host", host); + cJSON_AddNumberToObject(pRoot, "port", port); + cJSON_AddNumberToObject(pRoot, "vgId", p->vgId); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot); + return pJson; +} + +const char* syncUtilState2String(ESyncState state) { + if (state == TAOS_SYNC_STATE_FOLLOWER) { + return "TAOS_SYNC_STATE_FOLLOWER"; + } else if (state == TAOS_SYNC_STATE_CANDIDATE) { + return "TAOS_SYNC_STATE_CANDIDATE"; + } else if (state == TAOS_SYNC_STATE_LEADER) { + return "TAOS_SYNC_STATE_LEADER"; + } else { + return "TAOS_SYNC_STATE_UNKNOWN"; + } +} \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 770d1d1bd8..3a8eea53d4 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -10,6 +10,7 @@ add_executable(syncIOSendMsgServerTest "") add_executable(syncRaftStoreTest "") add_executable(syncEnqTest "") add_executable(syncIndexTest "") +add_executable(syncInitTest "") target_sources(syncTest @@ -60,6 +61,10 @@ target_sources(syncIndexTest PRIVATE "syncIndexTest.cpp" ) +target_sources(syncInitTest + PRIVATE + "syncInitTest.cpp" +) target_include_directories(syncTest @@ -122,6 +127,11 @@ target_include_directories(syncIndexTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncInitTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -172,6 +182,10 @@ target_link_libraries(syncIndexTest sync gtest_main ) +target_link_libraries(syncInitTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp new file mode 100644 index 0000000000..602297b2a6 --- /dev/null +++ b/source/libs/sync/test/syncInitTest.cpp @@ -0,0 +1,74 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[3] = {7010, 7110, 7210}; + +SSyncNode* syncInitTest() { + SSyncFSM* pFsm; + + SSyncInfo syncInfo; + syncInfo.vgId = 1; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); + snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = 0; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = ports[0]; + snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = ports[1]; + snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = ports[2]; + snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[0]); + assert(ret == 0); + + SSyncNode* pSyncNode = syncInitTest(); + assert(pSyncNode != NULL); + + cJSON* pJson = syncNode2Json(pSyncNode); + char* serialized = cJSON_Print(pJson); + printf("%s\n", serialized); + free(serialized); + cJSON_Delete(pJson); + + return 0; +}