diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 53fad4607a..d6fdadc1fb 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -138,6 +138,8 @@ typedef struct SSyncInfo { void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); + void* queue; + int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); } SSyncInfo; @@ -147,13 +149,10 @@ typedef struct SSyncNode SSyncNode; int32_t syncInit(); void syncCleanUp(); -int64_t syncStart(const SSyncInfo* pSyncInfo); -void syncStop(int64_t rid); -int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); - -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); -// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); - +int64_t syncStart(const SSyncInfo* pSyncInfo); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); ESyncState syncGetMyRole(int64_t rid); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 7e9e637854..ed5b86fa98 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -26,6 +26,9 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +void syncNodeElect(SSyncNode* pSyncNode); +void syncNodeRequestVotePeers(SSyncNode* pSyncNode); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 238948b403..40fa31d969 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -58,9 +58,10 @@ extern SSyncIO *gSyncIO; int32_t syncIOStart(char *host, uint16_t port); int32_t syncIOStop(); -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t syncIOTickQ(); int32_t syncIOTickPing(); +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index aedb9662b1..8268624501 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -113,6 +113,8 @@ typedef struct SSyncNode { char path[TSDB_FILENAME_LEN]; void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); + void* queue; + int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); // init internal SNodeInfo me; @@ -149,19 +151,19 @@ typedef struct SSyncNode { // timer tmr_h pPingTimer; int32_t pingTimerMS; - uint8_t pingTimerStart; + uint8_t pingTimerEnable; TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp uint64_t pingTimerCounter; tmr_h pElectTimer; int32_t electTimerMS; - uint8_t electTimerStart; + uint8_t electTimerEnable; TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp uint64_t electTimerCounter; tmr_h pHeartbeatTimer; int32_t heartbeatTimerMS; - uint8_t heartbeatTimerStart; + uint8_t heartbeatTimerEnable; TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp uint64_t heartbeatTimerCounter; @@ -180,8 +182,24 @@ void syncNodeClose(SSyncNode* pSyncNode); void syncNodePingAll(SSyncNode* pSyncNode); void syncNodePingPeers(SSyncNode* pSyncNode); void syncNodePingSelf(SSyncNode* pSyncNode); -int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); -int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); + +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); + +int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); +int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms); + +int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms); + +void syncNodeBecomeFollower(SSyncNode* pSyncNode); +void syncNodeBecomeLeader(SSyncNode* pSyncNode); +void syncNodeFollower2Candidate(SSyncNode* pSyncNode); +void syncNodeCandidate2Leader(SSyncNode* pSyncNode); +void syncNodeLeader2Follower(SSyncNode* pSyncNode); +void syncNodeCandidate2Follower(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3057e23bc2..5aa27e616b 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -30,6 +30,7 @@ extern "C" { // encode as uint32 typedef enum ESyncMessageType { + SYNC_UNKNOWN = 99, SYNC_PING = 101, SYNC_PING_REPLY = 103, SYNC_CLIENT_REQUEST = 105, @@ -40,6 +41,10 @@ typedef enum ESyncMessageType { SYNC_APPEND_ENTRIES_REPLY = 115, } ESyncMessageType; +// --------------------------------------------- +cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); +cJSON* syncRpcUnknownMsg2Json(); + // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 7f97ae9e49..a9875d5cae 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -26,6 +26,8 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 329105e2a1..433201b849 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -14,3 +14,7 @@ */ #include "syncElection.h" + +void syncNodeElect(SSyncNode* pSyncNode) {} + +void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 3ba145a96b..52cc0cda50 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -40,17 +40,6 @@ static void syncIOTickPingFunc(void *param, void *tmrId); // ---------------------------- // public function ------------ -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { - sTrace( - "<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " - "pMsg->msgType:%d, pMsg->contLen:%d", - clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, - pMsg->msgType, pMsg->contLen); - pMsg->handle = NULL; - rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); - return 0; -} - int32_t syncIOStart(char *host, uint16_t port) { gSyncIO = syncIOCreate(host, port); assert(gSyncIO != NULL); @@ -83,6 +72,35 @@ int32_t syncIOTickPing() { return ret; } +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { + sTrace( + "<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " + "pMsg->msgType:%d, pMsg->contLen:%d", + clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, + pMsg->msgType, pMsg->contLen); + { + cJSON *pJson = syncRpcMsg2Json(pMsg); + char * serialized = cJSON_Print(pJson); + sTrace("process syncMessage send: pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + pMsg->handle = NULL; + rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); + return 0; +} + +int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { + SRpcMsg *pTemp; + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + STaosQueue *pMsgQ = queue; + taosWriteQitem(pMsgQ, pTemp); + + return 0; +} + // local function ------------ static int32_t syncIOStartInternal(SSyncIO *io) { taosBlockSIGPIPE(); @@ -215,6 +233,7 @@ static void *syncIOConsumerFunc(void *param) { syncPingFromRpcMsg(pRpcMsg, pSyncMsg); // memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen); io->FpOnSyncPing(io->pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); } } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { @@ -223,6 +242,7 @@ static void *syncIOConsumerFunc(void *param) { pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen); syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg); io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); } } else { ; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7e01e7e81c..4183bfbe9a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -56,8 +56,6 @@ void syncStop(int64_t rid) {} int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } -// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; } - int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; } ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } @@ -76,6 +74,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; + pSyncNode->queue = pSyncInfo->queue; + pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex]; pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1; @@ -93,7 +93,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pPingTimer = NULL; pSyncNode->pingTimerMS = 1000; - atomic_store_8(&pSyncNode->pingTimerStart, 0); + atomic_store_8(&pSyncNode->pingTimerEnable, 0); pSyncNode->FpPingTimer = syncNodePingTimerCb; pSyncNode->pingTimerCounter = 0; @@ -148,22 +148,76 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { if (pSyncNode->pPingTimer == NULL) { pSyncNode->pPingTimer = - taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager); + taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager); } else { - taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager, + taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } - atomic_store_8(&pSyncNode->pingTimerStart, 1); + atomic_store_8(&pSyncNode->pingTimerEnable, 1); return 0; } int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { - atomic_store_8(&pSyncNode->pingTimerStart, 0); - pSyncNode->pingTimerCounter = TIMER_MAX_MS; + atomic_store_8(&pSyncNode->pingTimerEnable, 0); + pSyncNode->pingTimerMS = TIMER_MAX_MS; return 0; } +int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) { + if (pSyncNode->pElectTimer == NULL) { + pSyncNode->pElectTimer = + taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager); + } else { + taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pElectTimer); + } + + atomic_store_8(&pSyncNode->electTimerEnable, 1); + return 0; +} + +int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { + atomic_store_8(&pSyncNode->electTimerEnable, 0); + pSyncNode->electTimerMS = TIMER_MAX_MS; + return 0; +} + +int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } + +int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { + if (pSyncNode->pHeartbeatTimer == NULL) { + pSyncNode->pHeartbeatTimer = + taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager); + } else { + taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pHeartbeatTimer); + } + + atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1); + return 0; +} + +int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { + atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0); + pSyncNode->heartbeatTimerMS = TIMER_MAX_MS; + return 0; +} + +int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } + +void syncNodeBecomeFollower(SSyncNode* pSyncNode) {} + +void syncNodeBecomeLeader(SSyncNode* pSyncNode) {} + +void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {} + +void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} + +void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} + +void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} + // ------ local funciton --------- static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { sTrace("syncNodePing pSyncNode:%p ", pSyncNode); @@ -204,7 +258,6 @@ static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pM } static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { - sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode); SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); @@ -225,7 +278,7 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { { cJSON* pJson = syncPing2Json(pMsg); char* serialized = cJSON_Print(pJson); - sTrace("syncNodeOnPingCb syncNodePing pMsg:%s ", serialized); + sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); } @@ -245,7 +298,7 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { { cJSON* pJson = syncPingReply2Json(pMsg); char* serialized = cJSON_Print(pJson); - sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized); + sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); } @@ -275,7 +328,7 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR static void syncNodePingTimerCb(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; - if (atomic_load_8(&pSyncNode->pingTimerStart)) { + if (atomic_load_8(&pSyncNode->pingTimerEnable)) { ++(pSyncNode->pingTimerCounter); // pSyncNode->pingTimerMS += 100; @@ -289,6 +342,6 @@ static void syncNodePingTimerCb(void* param, void* tmrId) { taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { - sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart); + sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); } } \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 4c44b4691c..4dbae9bcba 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -20,6 +20,59 @@ void onMessage(SRaft* pRaft, void* pMsg) {} +// --------------------------------------------- +cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { + cJSON* pRoot; + + if (pRpcMsg->msgType == SYNC_PING) { + SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont; + pRoot = syncPing2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { + SyncPingReply* pSyncMsg = (SyncPingReply*)pRpcMsg->pCont; + pRoot = syncPingReply2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { + pRoot = syncRpcUnknownMsg2Json(); + + } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) { + pRoot = syncRpcUnknownMsg2Json(); + + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { + SyncRequestVote* pSyncMsg = (SyncRequestVote*)pRpcMsg->pCont; + pRoot = syncRequestVote2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply* pSyncMsg = (SyncRequestVoteReply*)pRpcMsg->pCont; + pRoot = syncRequestVoteReply2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { + SyncAppendEntries* pSyncMsg = (SyncAppendEntries*)pRpcMsg->pCont; + pRoot = syncAppendEntries2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply* pSyncMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont; + pRoot = syncAppendEntriesReply2Json(pSyncMsg); + + } else { + pRoot = syncRpcUnknownMsg2Json(); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "RpcMsg", pRoot); + return pJson; +} + +cJSON* syncRpcUnknownMsg2Json() { + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "msgType", SYNC_UNKNOWN); + cJSON_AddStringToObject(pRoot, "data", "known message"); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPing", pRoot); + return pJson; +} + // ---- message process SyncPing---- SyncPing* syncPingBuild(uint32_t dataLen) { uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 4cea7c150e..e3e551fd2b 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -14,3 +14,5 @@ */ #include "syncReplication.h" + +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 2913d230b2..4c5f7ffa56 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -8,6 +8,7 @@ add_executable(syncIOSendMsgTest "") add_executable(syncIOSendMsgClientTest "") add_executable(syncIOSendMsgServerTest "") add_executable(syncRaftStoreTest "") +add_executable(syncEnqTest "") target_sources(syncTest @@ -50,6 +51,10 @@ target_sources(syncRaftStoreTest PRIVATE "syncRaftStoreTest.cpp" ) +target_sources(syncEnqTest + PRIVATE + "syncEnqTest.cpp" +) target_include_directories(syncTest @@ -102,6 +107,11 @@ target_include_directories(syncRaftStoreTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncEnqTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -144,6 +154,10 @@ target_link_libraries(syncRaftStoreTest sync gtest_main ) +target_link_libraries(syncEnqTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp new file mode 100644 index 0000000000..57e2da6193 --- /dev/null +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -0,0 +1,99 @@ +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[3] = {7010, 7110, 7210}; + +SSyncNode* doSync(int myIndex) { + SSyncFSM* pFsm; + + SSyncInfo syncInfo; + syncInfo.vgId = 1; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = ports[0]; + snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = ports[1]; + snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = ports[2]; + snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +void timerPingAll(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + syncNodePingAll(pSyncNode); +} + +int main(int argc, char** argv) { + // taosInitLog((char*)"syncPingTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + if (myIndex > 2 || myIndex < 0) { + fprintf(stderr, "myIndex:%d error. should be 0 - 2", myIndex); + return 1; + } + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + SSyncNode* pSyncNode = doSync(myIndex); + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + + for (int i = 0; i < 10; ++i) { + SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + taosMsleep(1000); + } + + while (1) { + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 8268128347..06ba5ba6ce 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -22,6 +22,8 @@ SSyncNode* doSync(int myIndex) { syncInfo.vgId = 1; syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); @@ -80,11 +82,9 @@ int main(int argc, char** argv) { ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); - /* - taosMsleep(10000); - ret = syncNodeStopPingTimer(pSyncNode); - assert(ret == 0); - */ + taosMsleep(10000); + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); while (1) { taosMsleep(1000);