From 446a2ed1bfe19cab6570125b35298efa29bdc82a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 21 Apr 2022 17:35:48 +0800 Subject: [PATCH] rm syncReplicateTest 2, 3, Load --- source/libs/sync/test/CMakeLists.txt | 42 ---- .../libs/sync/test/syncReplicateLoadTest.cpp | 187 --------------- source/libs/sync/test/syncReplicateTest2.cpp | 203 ----------------- source/libs/sync/test/syncReplicateTest3.cpp | 215 ------------------ 4 files changed, 647 deletions(-) delete mode 100644 source/libs/sync/test/syncReplicateLoadTest.cpp delete mode 100644 source/libs/sync/test/syncReplicateTest2.cpp delete mode 100644 source/libs/sync/test/syncReplicateTest3.cpp diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 14d4401233..3319a73fcf 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -31,9 +31,6 @@ add_executable(syncElectTest "") add_executable(syncEncodeTest "") add_executable(syncWriteTest "") add_executable(syncReplicateTest "") -add_executable(syncReplicateTest2 "") -add_executable(syncReplicateTest3 "") -add_executable(syncReplicateLoadTest "") add_executable(syncRefTest "") add_executable(syncLogStoreCheck "") add_executable(syncRaftCfgTest "") @@ -174,18 +171,6 @@ target_sources(syncReplicateTest PRIVATE "syncReplicateTest.cpp" ) -target_sources(syncReplicateTest2 - PRIVATE - "syncReplicateTest2.cpp" -) -target_sources(syncReplicateTest3 - PRIVATE - "syncReplicateTest3.cpp" -) -target_sources(syncReplicateLoadTest - PRIVATE - "syncReplicateLoadTest.cpp" -) target_sources(syncRefTest PRIVATE "syncRefTest.cpp" @@ -377,21 +362,6 @@ target_include_directories(syncReplicateTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_include_directories(syncReplicateTest2 - PUBLIC - "${CMAKE_SOURCE_DIR}/include/libs/sync" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) -target_include_directories(syncReplicateTest3 - PUBLIC - "${CMAKE_SOURCE_DIR}/include/libs/sync" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) -target_include_directories(syncReplicateLoadTest - PUBLIC - "${CMAKE_SOURCE_DIR}/include/libs/sync" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) target_include_directories(syncRefTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" @@ -556,18 +526,6 @@ target_link_libraries(syncReplicateTest sync gtest_main ) -target_link_libraries(syncReplicateTest2 - sync - gtest_main -) -target_link_libraries(syncReplicateTest3 - sync - gtest_main -) -target_link_libraries(syncReplicateLoadTest - sync - gtest_main -) target_link_libraries(syncRefTest sync gtest_main diff --git a/source/libs/sync/test/syncReplicateLoadTest.cpp b/source/libs/sync/test/syncReplicateLoadTest.cpp deleted file mode 100644 index f58c372445..0000000000 --- a/source/libs/sync/test/syncReplicateLoadTest.cpp +++ /dev/null @@ -1,187 +0,0 @@ -#include -#include -#include "syncEnv.h" -#include "syncIO.h" -#include "syncInt.h" -#include "syncMessage.h" -#include "syncRaftEntry.h" -#include "syncRaftLog.h" -#include "syncRaftStore.h" -#include "syncUtil.h" - -void logTest() { - sTrace("--- sync log test: trace"); - sDebug("--- sync log test: debug"); - sInfo("--- sync log test: info"); - sWarn("--- sync log test: warn"); - sError("--- sync log test: error"); - sFatal("--- sync log test: fatal"); -} - -uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; -int32_t replicaNum = 3; -int32_t myIndex = 0; - -SRaftId ids[TSDB_MAX_REPLICA]; -SSyncInfo syncInfo; -SSyncFSM *pFsm; -SWal * pWal; - -void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); -} - -void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); -} - -void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); -} - -void initFsm() { - pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); - pFsm->FpCommitCb = CommitCb; - pFsm->FpPreCommitCb = PreCommitCb; - pFsm->FpRollBackCb = RollBackCb; -} - -int64_t syncNodeInit() { - syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; - syncInfo.FpEqMsg = syncIOEqMsg; - syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex); - - int code = walInit(); - assert(code == 0); - SWalCfg walCfg; - memset(&walCfg, 0, sizeof(SWalCfg)); - walCfg.vgId = syncInfo.vgId; - walCfg.fsyncPeriod = 1000; - walCfg.retentionPeriod = 1000; - walCfg.rollPeriod = 1000; - walCfg.retentionSize = 1000; - walCfg.segSize = 1000; - walCfg.level = TAOS_WAL_FSYNC; - - char tmpdir[128]; - snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex); - pWal = walOpen(tmpdir, &walCfg); - assert(pWal != NULL); - - syncInfo.pWal = pWal; - - SSyncCfg *pCfg = &syncInfo.syncCfg; - pCfg->myIndex = myIndex; - pCfg->replicaNum = replicaNum; - - for (int i = 0; i < replicaNum; ++i) { - pCfg->nodeInfo[i].nodePort = ports[i]; - snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - } - - int64_t rid = syncOpen(&syncInfo); - assert(rid > 0); - syncStart(rid); - - SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); - assert(pSyncNode != NULL); - - // pSyncNode->hbBaseLine = 500; - // pSyncNode->electBaseLine = 1500; - - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->pSyncNode = pSyncNode; - - syncNodeRelease(pSyncNode); - - return rid; -} - -void initRaftId(SSyncNode *pSyncNode) { - for (int i = 0; i < replicaNum; ++i) { - ids[i] = pSyncNode->replicasId[i]; - char *s = syncUtilRaftId2Str(&ids[i]); - printf("raftId[%d] : %s\n", i, s); - taosMemoryFree(s); - } -} - -SRpcMsg *step0(int i) { - SRpcMsg *pMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); - memset(pMsg, 0, sizeof(SRpcMsg)); - pMsg->msgType = 9999; - pMsg->contLen = 128; - pMsg->pCont = taosMemoryMalloc(pMsg->contLen); - snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i); - return pMsg; -} - -SyncClientRequest *step1(const SRpcMsg *pMsg) { - SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); - return pRetMsg; -} - -int main(int argc, char **argv) { - // taosInitLog((char *)"syncTest.log", 100000, 10); - tsAsyncLog = 0; - sDebugFlag = 143 + 64; - void logTest(); - - myIndex = 0; - if (argc >= 2) { - myIndex = atoi(argv[1]); - } - - int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); - assert(ret == 0); - - initFsm(); - - ret = syncInit(); - assert(ret == 0); - - int64_t rid = syncNodeInit(); - assert(rid > 0); - - SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); - assert(pSyncNode != NULL); - - syncNodeLog2((char *)"", pSyncNode); - initRaftId(pSyncNode); - - // only load ... - - while (1) { - sTrace( - "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " - "electTimerMS:%d, commitIndex:%ld", - pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, - pSyncNode->commitIndex); - taosMsleep(1000); - } - - return 0; -} diff --git a/source/libs/sync/test/syncReplicateTest2.cpp b/source/libs/sync/test/syncReplicateTest2.cpp deleted file mode 100644 index 1e2f823be5..0000000000 --- a/source/libs/sync/test/syncReplicateTest2.cpp +++ /dev/null @@ -1,203 +0,0 @@ -#include -#include -#include "syncEnv.h" -#include "syncIO.h" -#include "syncInt.h" -#include "syncMessage.h" -#include "syncRaftEntry.h" -#include "syncRaftLog.h" -#include "syncRaftStore.h" -#include "syncUtil.h" - -void logTest() { - sTrace("--- sync log test: trace"); - sDebug("--- sync log test: debug"); - sInfo("--- sync log test: info"); - sWarn("--- sync log test: warn"); - sError("--- sync log test: error"); - sFatal("--- sync log test: fatal"); -} - -uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; -int32_t replicaNum = 3; -int32_t myIndex = 0; - -SRaftId ids[TSDB_MAX_REPLICA]; -SSyncInfo syncInfo; -SSyncFSM *pFsm; -SWal * pWal; - -void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); -} - -void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); -} - -void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); -} - -void initFsm() { - pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); - pFsm->FpCommitCb = CommitCb; - pFsm->FpPreCommitCb = PreCommitCb; - pFsm->FpRollBackCb = RollBackCb; -} - -int64_t syncNodeInit() { - syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; - syncInfo.FpEqMsg = syncIOEqMsg; - syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex); - - int code = walInit(); - assert(code == 0); - SWalCfg walCfg; - memset(&walCfg, 0, sizeof(SWalCfg)); - walCfg.vgId = syncInfo.vgId; - walCfg.fsyncPeriod = 1000; - walCfg.retentionPeriod = 1000; - walCfg.rollPeriod = 1000; - walCfg.retentionSize = 1000; - walCfg.segSize = 1000; - walCfg.level = TAOS_WAL_FSYNC; - - char tmpdir[128]; - snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex); - pWal = walOpen(tmpdir, &walCfg); - assert(pWal != NULL); - - syncInfo.pWal = pWal; - - SSyncCfg *pCfg = &syncInfo.syncCfg; - pCfg->myIndex = myIndex; - pCfg->replicaNum = replicaNum; - - for (int i = 0; i < replicaNum; ++i) { - pCfg->nodeInfo[i].nodePort = ports[i]; - snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - } - - int64_t rid = syncOpen(&syncInfo); - assert(rid > 0); - syncStart(rid); - - SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); - assert(pSyncNode != NULL); - - // pSyncNode->hbBaseLine = 500; - // pSyncNode->electBaseLine = 1500; - - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->pSyncNode = pSyncNode; - - syncNodeRelease(pSyncNode); - - return rid; -} - -void initRaftId(SSyncNode *pSyncNode) { - for (int i = 0; i < replicaNum; ++i) { - ids[i] = pSyncNode->replicasId[i]; - char *s = syncUtilRaftId2Str(&ids[i]); - printf("raftId[%d] : %s\n", i, s); - taosMemoryFree(s); - } -} - -SRpcMsg *step0(int i) { - SRpcMsg *pMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); - memset(pMsg, 0, sizeof(SRpcMsg)); - pMsg->msgType = 9999; - pMsg->contLen = 128; - pMsg->pCont = taosMemoryMalloc(pMsg->contLen); - snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i); - return pMsg; -} - -SyncClientRequest *step1(const SRpcMsg *pMsg) { - SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); - return pRetMsg; -} - -int main(int argc, char **argv) { - // taosInitLog((char *)"syncTest.log", 100000, 10); - tsAsyncLog = 0; - sDebugFlag = 143 + 64; - void logTest(); - - myIndex = 0; - if (argc >= 2) { - myIndex = atoi(argv[1]); - } - - int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); - assert(ret == 0); - - initFsm(); - - ret = syncInit(); - assert(ret == 0); - - int64_t rid = syncNodeInit(); - assert(rid > 0); - - SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); - assert(pSyncNode != NULL); - - syncNodeLog2((char *)"", pSyncNode); - initRaftId(pSyncNode); - - for (int i = 0; i < 30; ++i) { - // step0 - SRpcMsg *pMsg0 = step0(i); - syncRpcMsgLog2((char *)"==step0==", pMsg0); - - syncPropose(rid, pMsg0, true); - taosMsleep(1000); - - taosMemoryFree(pMsg0); - - sTrace( - "syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " - "electTimerMS:%d, commitIndex:%ld", - pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, - pSyncNode->commitIndex); - } - - while (1) { - sTrace( - "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " - "electTimerMS:%d, commitIndex:%ld", - pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, - pSyncNode->commitIndex); - taosMsleep(1000); - } - - return 0; -} diff --git a/source/libs/sync/test/syncReplicateTest3.cpp b/source/libs/sync/test/syncReplicateTest3.cpp deleted file mode 100644 index e8e24b507d..0000000000 --- a/source/libs/sync/test/syncReplicateTest3.cpp +++ /dev/null @@ -1,215 +0,0 @@ -#define ALLOW_FORBID_FUNC - -#include -#include -#include "syncEnv.h" -#include "syncIO.h" -#include "syncInt.h" -#include "syncMessage.h" -#include "syncRaftEntry.h" -#include "syncRaftLog.h" -#include "syncRaftStore.h" -#include "syncUtil.h" - -void logTest() { - sTrace("--- sync log test: trace"); - sDebug("--- sync log test: debug"); - sInfo("--- sync log test: info"); - sWarn("--- sync log test: warn"); - sError("--- sync log test: error"); - sFatal("--- sync log test: fatal"); -} - -uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; -int32_t replicaNum = 3; -int32_t myIndex = 0; - -SRaftId ids[TSDB_MAX_REPLICA]; -SSyncInfo syncInfo; -SSyncFSM *pFsm; -SWal * pWal; - -void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); -} - -void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); -} - -void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); -} - -void initFsm() { - pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); - pFsm->FpCommitCb = CommitCb; - pFsm->FpPreCommitCb = PreCommitCb; - pFsm->FpRollBackCb = RollBackCb; -} - -int64_t syncNodeInit() { - syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; - syncInfo.FpEqMsg = syncIOEqMsg; - syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex); - - int code = walInit(); - assert(code == 0); - SWalCfg walCfg; - memset(&walCfg, 0, sizeof(SWalCfg)); - walCfg.vgId = syncInfo.vgId; - walCfg.fsyncPeriod = 1000; - walCfg.retentionPeriod = 1000; - walCfg.rollPeriod = 1000; - walCfg.retentionSize = 1000; - walCfg.segSize = 1000; - walCfg.level = TAOS_WAL_FSYNC; - - char tmpdir[128]; - snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex); - pWal = walOpen(tmpdir, &walCfg); - assert(pWal != NULL); - - syncInfo.pWal = pWal; - - SSyncCfg *pCfg = &syncInfo.syncCfg; - pCfg->myIndex = myIndex; - pCfg->replicaNum = replicaNum; - - for (int i = 0; i < replicaNum; ++i) { - pCfg->nodeInfo[i].nodePort = ports[i]; - snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - } - - int64_t rid = syncOpen(&syncInfo); - assert(rid > 0); - syncStart(rid); - - SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); - assert(pSyncNode != NULL); - - // pSyncNode->hbBaseLine = 500; - // pSyncNode->electBaseLine = 1500; - - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; - gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; - gSyncIO->pSyncNode = pSyncNode; - - syncNodeRelease(pSyncNode); - - return rid; -} - -void initRaftId(SSyncNode *pSyncNode) { - for (int i = 0; i < replicaNum; ++i) { - ids[i] = pSyncNode->replicasId[i]; - char *s = syncUtilRaftId2Str(&ids[i]); - printf("raftId[%d] : %s\n", i, s); - taosMemoryFree(s); - } -} - -SRpcMsg *step0(int i) { - SRpcMsg *pMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); - memset(pMsg, 0, sizeof(SRpcMsg)); - pMsg->msgType = 9999; - pMsg->contLen = 128; - pMsg->pCont = taosMemoryMalloc(pMsg->contLen); - snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i); - return pMsg; -} - -SyncClientRequest *step1(const SRpcMsg *pMsg) { - SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); - return pRetMsg; -} - -int main(int argc, char **argv) { - // taosInitLog((char *)"syncTest.log", 100000, 10); - tsAsyncLog = 0; - sDebugFlag = 143 + 64; - void logTest(); - - myIndex = 0; - if (argc >= 2) { - myIndex = atoi(argv[1]); - } - - int recordCount = 100; - if (argc >= 3) { - recordCount = atoi(argv[2]); - } - - int sleepMS = 10; - if (argc >= 4) { - sleepMS = atoi(argv[3]); - } - - int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); - assert(ret == 0); - - initFsm(); - - ret = syncInit(); - assert(ret == 0); - - int64_t rid = syncNodeInit(); - assert(rid > 0); - - SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); - assert(pSyncNode != NULL); - - syncNodeLog2((char *)"", pSyncNode); - initRaftId(pSyncNode); - - for (int i = 0; i < recordCount; ++i) { - // step0 - SRpcMsg *pMsg0 = step0(i); - syncRpcMsgLog2((char *)"==step0==", pMsg0); - - syncPropose(rid, pMsg0, true); - taosMsleep(sleepMS); - - taosMemoryFree(pMsg0); - - sTrace( - "syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " - "electTimerMS:%d, commitIndex:%ld", - pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, - pSyncNode->commitIndex); - } - - while (1) { - sTrace( - "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " - "electTimerMS:%d, commitIndex:%ld", - pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, - pSyncNode->commitIndex); - taosMsleep(1000); - } - - return 0; -}