From 8d789bba31d31f389ff15fdb96ade77d1493123d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 4 Mar 2022 15:48:09 +0800 Subject: [PATCH 1/7] sync enq --- include/libs/sync/sync.h | 13 ++-- source/libs/sync/inc/syncElection.h | 3 + source/libs/sync/inc/syncIO.h | 3 +- source/libs/sync/inc/syncInt.h | 28 ++++++-- source/libs/sync/inc/syncMessage.h | 5 ++ source/libs/sync/inc/syncReplication.h | 2 + source/libs/sync/src/syncElection.c | 4 ++ source/libs/sync/src/syncIO.c | 42 ++++++++--- source/libs/sync/src/syncMain.c | 79 ++++++++++++++++---- source/libs/sync/src/syncMessage.c | 53 ++++++++++++++ source/libs/sync/src/syncReplication.c | 2 + source/libs/sync/test/CMakeLists.txt | 14 ++++ source/libs/sync/test/syncEnqTest.cpp | 99 ++++++++++++++++++++++++++ source/libs/sync/test/syncPingTest.cpp | 10 +-- 14 files changed, 315 insertions(+), 42 deletions(-) create mode 100644 source/libs/sync/test/syncEnqTest.cpp diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 53fad4607a..d6fdadc1fb 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -138,6 +138,8 @@ typedef struct SSyncInfo { void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); + void* queue; + int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); } SSyncInfo; @@ -147,13 +149,10 @@ typedef struct SSyncNode SSyncNode; int32_t syncInit(); void syncCleanUp(); -int64_t syncStart(const SSyncInfo* pSyncInfo); -void syncStop(int64_t rid); -int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); - -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); -// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); - +int64_t syncStart(const SSyncInfo* pSyncInfo); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); ESyncState syncGetMyRole(int64_t rid); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 7e9e637854..ed5b86fa98 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -26,6 +26,9 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +void syncNodeElect(SSyncNode* pSyncNode); +void syncNodeRequestVotePeers(SSyncNode* pSyncNode); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 238948b403..40fa31d969 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -58,9 +58,10 @@ extern SSyncIO *gSyncIO; int32_t syncIOStart(char *host, uint16_t port); int32_t syncIOStop(); -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t syncIOTickQ(); int32_t syncIOTickPing(); +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index aedb9662b1..8268624501 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -113,6 +113,8 @@ typedef struct SSyncNode { char path[TSDB_FILENAME_LEN]; void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); + void* queue; + int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); // init internal SNodeInfo me; @@ -149,19 +151,19 @@ typedef struct SSyncNode { // timer tmr_h pPingTimer; int32_t pingTimerMS; - uint8_t pingTimerStart; + uint8_t pingTimerEnable; TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp uint64_t pingTimerCounter; tmr_h pElectTimer; int32_t electTimerMS; - uint8_t electTimerStart; + uint8_t electTimerEnable; TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp uint64_t electTimerCounter; tmr_h pHeartbeatTimer; int32_t heartbeatTimerMS; - uint8_t heartbeatTimerStart; + uint8_t heartbeatTimerEnable; TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp uint64_t heartbeatTimerCounter; @@ -180,8 +182,24 @@ void syncNodeClose(SSyncNode* pSyncNode); void syncNodePingAll(SSyncNode* pSyncNode); void syncNodePingPeers(SSyncNode* pSyncNode); void syncNodePingSelf(SSyncNode* pSyncNode); -int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); -int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); + +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); + +int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); +int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms); + +int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms); + +void syncNodeBecomeFollower(SSyncNode* pSyncNode); +void syncNodeBecomeLeader(SSyncNode* pSyncNode); +void syncNodeFollower2Candidate(SSyncNode* pSyncNode); +void syncNodeCandidate2Leader(SSyncNode* pSyncNode); +void syncNodeLeader2Follower(SSyncNode* pSyncNode); +void syncNodeCandidate2Follower(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3057e23bc2..5aa27e616b 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -30,6 +30,7 @@ extern "C" { // encode as uint32 typedef enum ESyncMessageType { + SYNC_UNKNOWN = 99, SYNC_PING = 101, SYNC_PING_REPLY = 103, SYNC_CLIENT_REQUEST = 105, @@ -40,6 +41,10 @@ typedef enum ESyncMessageType { SYNC_APPEND_ENTRIES_REPLY = 115, } ESyncMessageType; +// --------------------------------------------- +cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); +cJSON* syncRpcUnknownMsg2Json(); + // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 7f97ae9e49..a9875d5cae 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -26,6 +26,8 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 329105e2a1..433201b849 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -14,3 +14,7 @@ */ #include "syncElection.h" + +void syncNodeElect(SSyncNode* pSyncNode) {} + +void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 3ba145a96b..52cc0cda50 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -40,17 +40,6 @@ static void syncIOTickPingFunc(void *param, void *tmrId); // ---------------------------- // public function ------------ -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { - sTrace( - "<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " - "pMsg->msgType:%d, pMsg->contLen:%d", - clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, - pMsg->msgType, pMsg->contLen); - pMsg->handle = NULL; - rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); - return 0; -} - int32_t syncIOStart(char *host, uint16_t port) { gSyncIO = syncIOCreate(host, port); assert(gSyncIO != NULL); @@ -83,6 +72,35 @@ int32_t syncIOTickPing() { return ret; } +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { + sTrace( + "<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " + "pMsg->msgType:%d, pMsg->contLen:%d", + clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, + pMsg->msgType, pMsg->contLen); + { + cJSON *pJson = syncRpcMsg2Json(pMsg); + char * serialized = cJSON_Print(pJson); + sTrace("process syncMessage send: pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + pMsg->handle = NULL; + rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); + return 0; +} + +int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { + SRpcMsg *pTemp; + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + STaosQueue *pMsgQ = queue; + taosWriteQitem(pMsgQ, pTemp); + + return 0; +} + // local function ------------ static int32_t syncIOStartInternal(SSyncIO *io) { taosBlockSIGPIPE(); @@ -215,6 +233,7 @@ static void *syncIOConsumerFunc(void *param) { syncPingFromRpcMsg(pRpcMsg, pSyncMsg); // memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen); io->FpOnSyncPing(io->pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); } } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { @@ -223,6 +242,7 @@ static void *syncIOConsumerFunc(void *param) { pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen); syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg); io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); } } else { ; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7e01e7e81c..4183bfbe9a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -56,8 +56,6 @@ void syncStop(int64_t rid) {} int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } -// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; } - int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; } ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } @@ -76,6 +74,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; + pSyncNode->queue = pSyncInfo->queue; + pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex]; pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1; @@ -93,7 +93,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pPingTimer = NULL; pSyncNode->pingTimerMS = 1000; - atomic_store_8(&pSyncNode->pingTimerStart, 0); + atomic_store_8(&pSyncNode->pingTimerEnable, 0); pSyncNode->FpPingTimer = syncNodePingTimerCb; pSyncNode->pingTimerCounter = 0; @@ -148,22 +148,76 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { if (pSyncNode->pPingTimer == NULL) { pSyncNode->pPingTimer = - taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager); + taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager); } else { - taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager, + taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } - atomic_store_8(&pSyncNode->pingTimerStart, 1); + atomic_store_8(&pSyncNode->pingTimerEnable, 1); return 0; } int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { - atomic_store_8(&pSyncNode->pingTimerStart, 0); - pSyncNode->pingTimerCounter = TIMER_MAX_MS; + atomic_store_8(&pSyncNode->pingTimerEnable, 0); + pSyncNode->pingTimerMS = TIMER_MAX_MS; return 0; } +int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) { + if (pSyncNode->pElectTimer == NULL) { + pSyncNode->pElectTimer = + taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager); + } else { + taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pElectTimer); + } + + atomic_store_8(&pSyncNode->electTimerEnable, 1); + return 0; +} + +int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { + atomic_store_8(&pSyncNode->electTimerEnable, 0); + pSyncNode->electTimerMS = TIMER_MAX_MS; + return 0; +} + +int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } + +int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { + if (pSyncNode->pHeartbeatTimer == NULL) { + pSyncNode->pHeartbeatTimer = + taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager); + } else { + taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pHeartbeatTimer); + } + + atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1); + return 0; +} + +int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { + atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0); + pSyncNode->heartbeatTimerMS = TIMER_MAX_MS; + return 0; +} + +int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } + +void syncNodeBecomeFollower(SSyncNode* pSyncNode) {} + +void syncNodeBecomeLeader(SSyncNode* pSyncNode) {} + +void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {} + +void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} + +void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} + +void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} + // ------ local funciton --------- static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { sTrace("syncNodePing pSyncNode:%p ", pSyncNode); @@ -204,7 +258,6 @@ static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pM } static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { - sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode); SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); @@ -225,7 +278,7 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { { cJSON* pJson = syncPing2Json(pMsg); char* serialized = cJSON_Print(pJson); - sTrace("syncNodeOnPingCb syncNodePing pMsg:%s ", serialized); + sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); } @@ -245,7 +298,7 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { { cJSON* pJson = syncPingReply2Json(pMsg); char* serialized = cJSON_Print(pJson); - sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized); + sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); } @@ -275,7 +328,7 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR static void syncNodePingTimerCb(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; - if (atomic_load_8(&pSyncNode->pingTimerStart)) { + if (atomic_load_8(&pSyncNode->pingTimerEnable)) { ++(pSyncNode->pingTimerCounter); // pSyncNode->pingTimerMS += 100; @@ -289,6 +342,6 @@ static void syncNodePingTimerCb(void* param, void* tmrId) { taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { - sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart); + sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); } } \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 4c44b4691c..4dbae9bcba 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -20,6 +20,59 @@ void onMessage(SRaft* pRaft, void* pMsg) {} +// --------------------------------------------- +cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { + cJSON* pRoot; + + if (pRpcMsg->msgType == SYNC_PING) { + SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont; + pRoot = syncPing2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { + SyncPingReply* pSyncMsg = (SyncPingReply*)pRpcMsg->pCont; + pRoot = syncPingReply2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { + pRoot = syncRpcUnknownMsg2Json(); + + } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) { + pRoot = syncRpcUnknownMsg2Json(); + + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { + SyncRequestVote* pSyncMsg = (SyncRequestVote*)pRpcMsg->pCont; + pRoot = syncRequestVote2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply* pSyncMsg = (SyncRequestVoteReply*)pRpcMsg->pCont; + pRoot = syncRequestVoteReply2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { + SyncAppendEntries* pSyncMsg = (SyncAppendEntries*)pRpcMsg->pCont; + pRoot = syncAppendEntries2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply* pSyncMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont; + pRoot = syncAppendEntriesReply2Json(pSyncMsg); + + } else { + pRoot = syncRpcUnknownMsg2Json(); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "RpcMsg", pRoot); + return pJson; +} + +cJSON* syncRpcUnknownMsg2Json() { + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "msgType", SYNC_UNKNOWN); + cJSON_AddStringToObject(pRoot, "data", "known message"); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPing", pRoot); + return pJson; +} + // ---- message process SyncPing---- SyncPing* syncPingBuild(uint32_t dataLen) { uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 4cea7c150e..e3e551fd2b 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -14,3 +14,5 @@ */ #include "syncReplication.h" + +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 2913d230b2..4c5f7ffa56 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -8,6 +8,7 @@ add_executable(syncIOSendMsgTest "") add_executable(syncIOSendMsgClientTest "") add_executable(syncIOSendMsgServerTest "") add_executable(syncRaftStoreTest "") +add_executable(syncEnqTest "") target_sources(syncTest @@ -50,6 +51,10 @@ target_sources(syncRaftStoreTest PRIVATE "syncRaftStoreTest.cpp" ) +target_sources(syncEnqTest + PRIVATE + "syncEnqTest.cpp" +) target_include_directories(syncTest @@ -102,6 +107,11 @@ target_include_directories(syncRaftStoreTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncEnqTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -144,6 +154,10 @@ target_link_libraries(syncRaftStoreTest sync gtest_main ) +target_link_libraries(syncEnqTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp new file mode 100644 index 0000000000..57e2da6193 --- /dev/null +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -0,0 +1,99 @@ +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[3] = {7010, 7110, 7210}; + +SSyncNode* doSync(int myIndex) { + SSyncFSM* pFsm; + + SSyncInfo syncInfo; + syncInfo.vgId = 1; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = ports[0]; + snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = ports[1]; + snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = ports[2]; + snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +void timerPingAll(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + syncNodePingAll(pSyncNode); +} + +int main(int argc, char** argv) { + // taosInitLog((char*)"syncPingTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + if (myIndex > 2 || myIndex < 0) { + fprintf(stderr, "myIndex:%d error. should be 0 - 2", myIndex); + return 1; + } + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + SSyncNode* pSyncNode = doSync(myIndex); + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + + for (int i = 0; i < 10; ++i) { + SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + taosMsleep(1000); + } + + while (1) { + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 8268128347..06ba5ba6ce 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -22,6 +22,8 @@ SSyncNode* doSync(int myIndex) { syncInfo.vgId = 1; syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); @@ -80,11 +82,9 @@ int main(int argc, char** argv) { ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); - /* - taosMsleep(10000); - ret = syncNodeStopPingTimer(pSyncNode); - assert(ret == 0); - */ + taosMsleep(10000); + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); while (1) { taosMsleep(1000); From 98b35306998cd9e0be9c27be9b782b8d09ec12aa Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 4 Mar 2022 16:54:25 +0800 Subject: [PATCH 2/7] sync timeout --- source/libs/sync/inc/syncIO.h | 7 ++++--- source/libs/sync/inc/syncInt.h | 4 ++++ source/libs/sync/inc/syncMessage.h | 14 ++++++++++++++ source/libs/sync/src/syncIO.c | 8 +++++--- source/libs/sync/src/syncMain.c | 6 ++++++ source/libs/sync/test/syncEnqTest.cpp | 2 +- 6 files changed, 34 insertions(+), 7 deletions(-) diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 40fa31d969..a948de8ac1 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -31,11 +31,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset * pQset; + STaosQset *pQset; pthread_t consumerTid; - void * serverRpc; - void * clientRpc; + void *serverRpc; + void *clientRpc; SEpSet myAddr; void *ioTimerTickQ; @@ -49,6 +49,7 @@ typedef struct SSyncIO { int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg); + int32_t (*FpOnSyncTimeout)(SSyncNode *pSyncNode, SyncTimeout *pMsg); int8_t isStart; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8268624501..d67b419b24 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -71,6 +71,9 @@ extern int32_t sDebugFlag; struct SRaft; typedef struct SRaft SRaft; +struct SyncTimeout; +typedef struct SyncTimeout SyncTimeout; + struct SyncPing; typedef struct SyncPing SyncPing; @@ -174,6 +177,7 @@ typedef struct SSyncNode { int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t (*FpOnTimeout)(SSyncNode* pSyncNode, SyncTimeout* pMsg); } SSyncNode; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 5aa27e616b..a2e745b3d9 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -39,12 +39,26 @@ typedef enum ESyncMessageType { SYNC_REQUEST_VOTE_REPLY = 111, SYNC_APPEND_ENTRIES = 113, SYNC_APPEND_ENTRIES_REPLY = 115, + SYNC_TIMEOUT = 117, } ESyncMessageType; // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); cJSON* syncRpcUnknownMsg2Json(); +// --------------------------------------------- +typedef enum ESyncTimeoutType { + SYNC_TIMEOUT_PING = 0, + SYNC_TIMEOUT_ELECTION, + SYNC_TIMEOUT_HEARTBEAT, + +} ESyncTimeoutType; + +typedef struct SyncTimeout { + ESyncTimeoutType type; + void* data; +} SyncTimeout; + // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 52cc0cda50..ffd982f233 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -80,7 +80,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { pMsg->msgType, pMsg->contLen); { cJSON *pJson = syncRpcMsg2Json(pMsg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); sTrace("process syncMessage send: pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); @@ -211,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) { SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; int type; qall = taosAllocateQall(); @@ -244,7 +244,9 @@ static void *syncIOConsumerFunc(void *param) { io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } - } else { + + } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { + } else { ; } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4183bfbe9a..57bc754735 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -37,6 +37,7 @@ static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); // --------------------------------- int32_t syncInit() { @@ -326,6 +327,11 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR return ret; } +static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { + int32_t ret = 0; + return ret; +} + static void syncNodePingTimerCb(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_8(&pSyncNode->pingTimerEnable)) { diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 57e2da6193..0bf43f933e 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -85,7 +85,7 @@ int main(int argc, char** argv) { for (int i = 0; i < 10; ++i) { SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); taosMsleep(1000); From 09be95e40463c51ccc2094e4b88bb51d42ac57a4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 5 Mar 2022 10:23:16 +0800 Subject: [PATCH 3/7] sysinfo for monitor --- include/libs/monitor/monitor.h | 10 +-- include/os/osSysinfo.h | 4 +- source/dnode/mgmt/impl/inc/dndEnv.h | 1 + source/dnode/mgmt/impl/src/dndMgmt.c | 14 ++-- source/dnode/mgmt/impl/src/dndVnodes.c | 7 ++ source/os/src/osSysinfo.c | 108 +++++++++++-------------- 6 files changed, 69 insertions(+), 75 deletions(-) diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 0041f3ac5f..19b4ad6dc6 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -92,11 +92,11 @@ typedef struct { int64_t mem_engine; // KB int64_t mem_system; // KB int64_t mem_total; // KB - float disk_engine; // GB - float disk_used; // GB - float disk_total; // GB - int64_t net_in; - int64_t net_out; + int64_t disk_engine; // Byte + int64_t disk_used; // Byte + int64_t disk_total; // Byte + int64_t net_in; // Byte + int64_t net_out; // Byte float io_read; float io_write; float io_read_disk; diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index e14dba8269..da0a801290 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -43,8 +43,8 @@ int32_t taosGetTotalMemory(int64_t *totalKB); int32_t taosGetProcMemory(int64_t *usedKB); int32_t taosGetSysMemory(int64_t *usedKB); int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize); -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars); -int32_t taosGetProcIO(float *readKB, float *writeKB); +int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes); +int32_t taosGetProcIOSpeed(float *readKB, float *writeKB, float *readDiskKB, float *writeDiskKB); int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes); int32_t taosGetBandSpeed(float *bandSpeedKb); diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index efe54ee1e5..b9acbea02f 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -108,6 +108,7 @@ typedef struct { SHashObj *hash; int32_t openVnodes; int32_t totalVnodes; + int32_t masterNum; SRWLatch latch; SQWorkerPool queryPool; SFWorkerPool fetchPool; diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 60cfdc299c..0a3a39e830 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -487,12 +487,10 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { taosGetSysMemory(&pInfo->mem_system); pInfo->mem_total = tsTotalMemoryKB; pInfo->disk_engine = 0; - pInfo->disk_used = tsDataSpace.size.used / (1024 * 1024 * 1024.0); - pInfo->disk_total = tsDataSpace.size.avail / (1024 * 1024 * 1024.0); + pInfo->disk_used = tsDataSpace.size.used; + pInfo->disk_total = tsDataSpace.size.avail; taosGetCardInfo(NULL, &pInfo->net_in, &pInfo->net_out); - taosGetProcIO(&pInfo->io_read, &pInfo->io_write); - pInfo->io_read_disk = 0; - pInfo->io_write_disk = 0; + taosGetProcIOSpeed(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); pInfo->req_select = 0; pInfo->req_select_rate = 0; pInfo->req_insert = 0; @@ -501,9 +499,9 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { pInfo->req_insert_batch = 0; pInfo->req_insert_batch_success = 0; pInfo->req_insert_batch_rate = 0; - pInfo->errors = 0; - pInfo->vnodes_num = 0; - pInfo->masters = 0; + pInfo->errors = tsNumOfErrorLogs; + pInfo->vnodes_num = pDnode->vmgmt.totalVnodes; + pInfo->masters = pDnode->vmgmt.masterNum; pInfo->has_mnode = dndIsMnode(pDnode); } diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 1c7bac4b6b..28bc615aba 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -17,6 +17,7 @@ #include "dndVnodes.h" #include "dndMgmt.h" #include "dndTransport.h" +#include "sync.h" typedef struct { int32_t vgId; @@ -979,6 +980,8 @@ void dndCleanupVnodes(SDnode *pDnode) { void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; + int32_t totalVnodes = 0; + int32_t masterNum = 0; taosRLockLatch(&pMgmt->latch); @@ -993,8 +996,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { vnodeGetLoad(pVnode->pImpl, &vload); taosArrayPush(pLoads, &vload); + totalVnodes++; + if (vload.role == TAOS_SYNC_STATE_LEADER) masterNum++; pIter = taosHashIterate(pMgmt->hash, pIter); } taosRUnLockLatch(&pMgmt->latch); + pMgmt->totalVnodes = totalVnodes; + pMgmt->masterNum = masterNum; } diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 12952141a4..73b64b5319 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -124,52 +124,26 @@ int32_t taosGetBandSpeed(float *bandSpeedKb) { return 0; } -int32_t taosReadProcIO(int64_t *readbyte, int64_t *writebyte) { +int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { IO_COUNTERS io_counter; if (GetProcessIoCounters(GetCurrentProcess(), &io_counter)) { - if (readbyte) *readbyte = io_counter.ReadTransferCount; - if (writebyte) *writebyte = io_counter.WriteTransferCount; + if (rchars) *rchars = io_counter.ReadTransferCount; + if (wchars) *wchars = io_counter.WriteTransferCount; + if (read_bytes) *read_bytes = 0; + if (write_bytes) *write_bytes = 0; return 0; } return -1; } -int32_t taosGetProcIO(float *readKB, float *writeKB) { - static int64_t lastReadbyte = -1; - static int64_t lastWritebyte = -1; - - int64_t curReadbyte = 0; - int64_t curWritebyte = 0; - - if (taosReadProcIO(&curReadbyte, &curWritebyte) != 0) { - return -1; - } - - if (lastReadbyte == -1 || lastWritebyte == -1) { - lastReadbyte = curReadbyte; - lastWritebyte = curWritebyte; - return -1; - } - - *readKB = (float)((double)(curReadbyte - lastReadbyte) / 1024); - *writeKB = (float)((double)(curWritebyte - lastWritebyte) / 1024); - if (*readKB < 0) *readKB = 0; - if (*writeKB < 0) *writeKB = 0; - - lastReadbyte = curReadbyte; - lastWritebyte = curWritebyte; - - return 0; -} - void taosGetSystemInfo() { taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); - float tmp1, tmp2; + float tmp1, tmp2, tmp3, tmp4; taosGetBandSpeed(&tmp1); taosGetCpuUsage(&tmp1, &tmp2); - taosGetProcIO(&tmp1, &tmp2); + taosGetProcIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); } void taosKillSystem() { @@ -259,15 +233,11 @@ void taosGetSystemInfo() { tsNumOfCores = sysconf(_SC_NPROCESSORS_ONLN); } -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars) { +int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { if (rchars) *rchars = 0; if (wchars) *wchars = 0; - return 0; -} - -int32_t taosGetProcIO(float *readKB, float *writeKB) { - *readKB = 0; - *writeKB = 0; + if (read_bytes) *read_bytes = 0; + if (write_bytes) *write_bytes = 0; return 0; } @@ -631,8 +601,7 @@ int32_t taosGetBandSpeed(float *bandSpeedKb) { return 0; } -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars) { - // FILE *fp = fopen(tsProcIOFile, "r"); +int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { TdFilePtr pFile = taosOpenFile(tsProcIOFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { // printf("open file:%s failed", tsProcIOFile); @@ -655,16 +624,22 @@ int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars) { } else if (strstr(line, "wchar:") != NULL) { sscanf(line, "%s %" PRId64, tmp, wchars); readIndex++; - } else { + } if (strstr(line, "read_bytes:") != NULL) { + sscanf(line, "%s %" PRId64, tmp, read_bytes); + readIndex++; + } else if (strstr(line, "write_bytes::") != NULL) { + sscanf(line, "%s %" PRId64, tmp, write_bytes); + readIndex++; + }else { } - if (readIndex >= 2) break; + if (readIndex >= 4) break; } if (line != NULL) tfree(line); taosCloseFile(&pFile); - if (readIndex < 2) { + if (readIndex < 4) { // printf("read file:%s failed", tsProcIOFile); return -1; } @@ -672,30 +647,43 @@ int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars) { return 0; } -int32_t taosGetProcIO(float *readKB, float *writeKB) { - static int64_t lastReadbyte = -1; - static int64_t lastWritebyte = -1; +int32_t taosGetProcIOSpeed(float *readKB, float *writeKB, float *readDiskKB, float *writeDiskKB) { + static int64_t lastRchar = -1; + static int64_t lastWchar = -1; + static int64_t lastRbyte = -1; + static int64_t lastWbyte = -1; - int64_t curReadbyte = 0; - int64_t curWritebyte = 0; + int64_t curRchar = 0; + int64_t curWchar = 0; + int64_t curRbyte = 0; + int64_t curWbyte = 0; - if (taosReadProcIO(&curReadbyte, &curWritebyte) != 0) { + if (taosReadProcIO(&curRchar, &curWchar, &curRbyte, &curWbyte) != 0) { return -1; } - if (lastReadbyte == -1 || lastWritebyte == -1) { - lastReadbyte = curReadbyte; - lastWritebyte = curWritebyte; + if (lastRchar == -1 || lastWchar == -1 || lastRbyte == -1 || lastWbyte == -1) { + lastRchar = curRchar; + lastWchar = curWchar; + lastRbyte = curRbyte; + lastWbyte = curWbyte; return -1; } - *readKB = (float)((double)(curReadbyte - lastReadbyte) / 1024); - *writeKB = (float)((double)(curWritebyte - lastWritebyte) / 1024); + *readKB = (curRchar - lastRchar) / 1024.0f; + *writeKB = (curWchar - lastWchar) / 1024.0f; + *readDiskKB = (curRbyte - lastRbyte) / 1024.0f; + *writeDiskKB = (curWbyte - lastWbyte) / 1024.0f; + if (*readKB < 0) *readKB = 0; if (*writeKB < 0) *writeKB = 0; + if (*readDiskKB < 0) *readDiskKB = 0; + if (*writeDiskKB < 0) *writeDiskKB = 0; - lastReadbyte = curReadbyte; - lastWritebyte = curWritebyte; + lastRchar = curRchar; + lastWchar = curWchar; + lastRbyte = curRbyte; + lastWbyte = curWbyte; return 0; } @@ -705,10 +693,10 @@ void taosGetSystemInfo() { taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); - float tmp1, tmp2; + float tmp1, tmp2, tmp3, tmp4; taosGetBandSpeed(&tmp1); taosGetCpuUsage(&tmp1, &tmp2); - taosGetProcIO(&tmp1, &tmp2); + taosGetProcIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); } void taosKillSystem() { From c368e596891489871c105eb80ceb97d542f70dde Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 5 Mar 2022 11:08:15 +0800 Subject: [PATCH 4/7] add update on index --- source/libs/index/src/index.c | 114 +++++++++++----------------- source/libs/index/src/index_tfile.c | 1 - source/libs/index/test/jsonUT.cc | 16 ++++ 3 files changed, 62 insertions(+), 69 deletions(-) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 57e965a753..75dc05fc5b 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -16,6 +16,7 @@ #include "index.h" #include "indexInt.h" #include "index_cache.h" +#include "index_comm.h" #include "index_tfile.h" #include "index_util.h" #include "tdef.h" @@ -30,8 +31,6 @@ void* indexQhandle = NULL; -static char JSON_COLUMN[] = "JSON"; - #define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ { \ bool f = false; \ @@ -64,13 +63,11 @@ typedef struct SIdxColInfo { int cVersion; } SIdxColInfo; -typedef struct SIdxMergeHelper { - char* colVal; +typedef struct SIdxTempResult { SArray* total; SArray* added; SArray* deled; - bool reset; -} SIdxMergeHelper; +} SIdxTempResult; static pthread_once_t isInit = PTHREAD_ONCE_INIT; // static void indexInit(); @@ -82,8 +79,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); // merge cache and tfile by opera type -static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxMergeHelper* helper); -static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper); +static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTempResult* helper); // static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf); @@ -399,7 +395,6 @@ static void indexInterResultsDestroy(SArray* results) { static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) { // refactor, merge interResults into fResults by oType - for (int i = 0; i < taosArrayGetSize(interResults); i--) { SArray* t = taosArrayGetP(interResults, i); taosArraySort(t, uidCompare); @@ -418,98 +413,82 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType return 0; } -SIdxMergeHelper* sIdxMergeHelperCreate() { - SIdxMergeHelper* hp = calloc(1, sizeof(SIdxMergeHelper)); - hp->total = taosArrayInit(4, sizeof(uint64_t)); - hp->added = taosArrayInit(4, sizeof(uint64_t)); - hp->deled = taosArrayInit(4, sizeof(uint64_t)); - hp->reset = false; - return hp; +SIdxTempResult* sIdxTempResultCreate() { + SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult)); + tr->total = taosArrayInit(4, sizeof(uint64_t)); + tr->added = taosArrayInit(4, sizeof(uint64_t)); + tr->deled = taosArrayInit(4, sizeof(uint64_t)); + return tr; } -void sIdxMergeHelperClear(SIdxMergeHelper* hp) { - if (hp == NULL) { +void sIdxTempResultClear(SIdxTempResult* tr) { + if (tr == NULL) { return; } - hp->reset = false; - taosArrayClear(hp->total); - taosArrayClear(hp->added); - taosArrayClear(hp->deled); + taosArrayClear(tr->total); + taosArrayClear(tr->added); + taosArrayClear(tr->deled); } -void sIdxMergeHelperDestroy(SIdxMergeHelper* hp) { - if (hp == NULL) { +void sIdxTempResultDestroy(SIdxTempResult* tr) { + if (tr == NULL) { return; } - taosArrayDestroy(hp->total); - taosArrayDestroy(hp->added); - taosArrayDestroy(hp->deled); + taosArrayDestroy(tr->total); + taosArrayDestroy(tr->added); + taosArrayDestroy(tr->deled); } -static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper) { - int32_t sz = result ? taosArrayGetSize(result) : 0; - if (sz > 0) { - // TODO(yihao): remove duplicate tableid - TFileValue* lv = taosArrayGetP(result, sz - 1); - // indexError("merge colVal: %s", lv->colVal); - if (strcmp(lv->colVal, tv->colVal) == 0) { - taosArrayAddAll(lv->tableId, tv->tableId); - tfileValueDestroy(tv); - } else { - taosArrayPush(result, &tv); - } - } else { - taosArrayPush(result, &tv); - } -} -static void sIdxMergeResult(SArray* result, SIdxMergeHelper* mh) { - taosArraySort(mh->total, uidCompare); - taosArraySort(mh->added, uidCompare); - taosArraySort(mh->deled, uidCompare); +static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) { + taosArraySort(tr->total, uidCompare); + taosArraySort(tr->added, uidCompare); + taosArraySort(tr->deled, uidCompare); SArray* arrs = taosArrayInit(2, sizeof(void*)); - taosArrayPush(arrs, &mh->total); - taosArrayPush(arrs, &mh->added); + taosArrayPush(arrs, &tr->total); + taosArrayPush(arrs, &tr->added); iUnion(arrs, result); taosArrayDestroy(arrs); - iExcept(result, mh->deled); + iExcept(result, tr->deled); } -static void indexMayMergeToFinalResult(SArray* result, TFileValue* tfv, SIdxMergeHelper* help) { +static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) { int32_t sz = taosArrayGetSize(result); if (sz > 0) { TFileValue* lv = taosArrayGetP(result, sz - 1); if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) { - sIdxMergeResult(lv->tableId, help); - sIdxMergeHelperClear(help); + sIdxTempResultMergeTo(lv->tableId, tr); + sIdxTempResultClear(tr); taosArrayPush(result, &tfv); } else if (tfv == NULL) { - sIdxMergeResult(lv->tableId, help); + // handle last iterator + sIdxTempResultMergeTo(lv->tableId, tr); } else { + // temp result saved in help tfileValueDestroy(tfv); } } else { taosArrayPush(result, &tfv); } } -static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxMergeHelper* mh) { +static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTempResult* tr) { char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; TFileValue* tfv = tfileValueCreate(colVal); - indexMayMergeToFinalResult(result, tfv, mh); + indexMayMergeTempToFinalResult(result, tfv, tr); if (cv != NULL) { uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0); if (cv->type == ADD_VALUE) { - INDEX_MERGE_ADD_DEL(mh->deled, mh->added, id) + INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id) } else if (cv->type == DEL_VALUE) { - INDEX_MERGE_ADD_DEL(mh->added, mh->deled, id) + INDEX_MERGE_ADD_DEL(tr->added, tr->deled, id) } } if (tv != NULL) { - taosArrayAddAll(mh->total, tv->val); + taosArrayAddAll(tr->total, tv->val); } } -static void indexDestroyTempResult(SArray* result) { +static void indexDestroyFinalResult(SArray* result) { int32_t sz = result ? taosArrayGetSize(result) : 0; for (size_t i = 0; i < sz; i++) { TFileValue* tv = taosArrayGetP(result, i); @@ -543,7 +522,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false; - SIdxMergeHelper* help = sIdxMergeHelperCreate(); + SIdxTempResult* tr = sIdxTempResultCreate(); while (cn == true || tn == true) { IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; @@ -557,21 +536,22 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { comp = 1; } if (comp == 0) { - indexMergeCacheAndTFile(result, cv, tv, help); + indexMergeCacheAndTFile(result, cv, tv, tr); cn = cacheIter->next(cacheIter); tn = tfileIter->next(tfileIter); } else if (comp < 0) { - indexMergeCacheAndTFile(result, cv, NULL, help); + indexMergeCacheAndTFile(result, cv, NULL, tr); cn = cacheIter->next(cacheIter); } else { - indexMergeCacheAndTFile(result, NULL, tv, help); + indexMergeCacheAndTFile(result, NULL, tv, tr); tn = tfileIter->next(tfileIter); } } - indexMayMergeToFinalResult(result, NULL, help); + indexMayMergeTempToFinalResult(result, NULL, tr); + sIdxTempResultDestroy(tr); int ret = indexGenTFile(sIdx, pCache, result); - indexDestroyTempResult(result); + indexDestroyFinalResult(result); indexCacheDestroyImm(pCache); @@ -581,8 +561,6 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { tfileReaderUnRef(pReader); indexCacheUnRef(pCache); - sIdxMergeHelperDestroy(help); - int64_t cost = taosGetTimestampUs() - st; if (ret != 0) { indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 6cfc199afd..f5f46b0617 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -102,7 +102,6 @@ void tfileCacheDestroy(TFileCache* tcache) { if (tcache == NULL) { return; } - // free table cache TFileReader** reader = taosHashIterate(tcache->tableCache, NULL); while (reader) { diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index e5c79d137f..df9f8b8439 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -105,6 +105,22 @@ TEST_F(JsonEnv, testWriteMillonData) { } indexMultiTermDestroy(terms); } + { + std::string colName("voltagefdadfa"); + std::string colVal("abxxxxxxxxxxxx"); + for (uint i = 0; i < 1000; i++) { + colVal[i % colVal.size()] = '0' + i % 128; + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + } { std::string colName("voltagefdadfa"); std::string colVal("abxxxxxxxxxxxx"); From cde33ee27a3b0ad07080fcc6c7fa1a3e133e8896 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 5 Mar 2022 11:56:00 +0800 Subject: [PATCH 5/7] io and band --- include/libs/monitor/monitor.h | 28 ++--- include/os/osSysinfo.h | 8 +- source/dnode/mgmt/impl/src/dndMgmt.c | 6 +- source/os/src/osSysinfo.c | 182 ++++++++++++--------------- 4 files changed, 101 insertions(+), 123 deletions(-) diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 19b4ad6dc6..1695edd983 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -86,21 +86,21 @@ typedef struct { typedef struct { float uptime; // day - float cpu_engine; - float cpu_system; + double cpu_engine; + double cpu_system; float cpu_cores; - int64_t mem_engine; // KB - int64_t mem_system; // KB - int64_t mem_total; // KB - int64_t disk_engine; // Byte - int64_t disk_used; // Byte - int64_t disk_total; // Byte - int64_t net_in; // Byte - int64_t net_out; // Byte - float io_read; - float io_write; - float io_read_disk; - float io_write_disk; + int64_t mem_engine; // KB + int64_t mem_system; // KB + int64_t mem_total; // KB + int64_t disk_engine; // Byte + int64_t disk_used; // Byte + int64_t disk_total; // Byte + double net_in; // bytes per second + double net_out; // bytes per second + double io_read; + double io_write; + double io_read_disk; + double io_write_disk; int32_t req_select; float req_select_rate; int32_t req_insert; diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index da0a801290..9081fa9715 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -38,15 +38,15 @@ int32_t taosGetEmail(char *email, int32_t maxLen); int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen); int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores); int32_t taosGetCpuCores(float *numOfCores); -int32_t taosGetCpuUsage(float *cpu_system, float *cpu_engine); +int32_t taosGetCpuUsage(double *cpu_system, double *cpu_engine); int32_t taosGetTotalMemory(int64_t *totalKB); int32_t taosGetProcMemory(int64_t *usedKB); int32_t taosGetSysMemory(int64_t *usedKB); int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize); int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes); -int32_t taosGetProcIOSpeed(float *readKB, float *writeKB, float *readDiskKB, float *writeDiskKB); -int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes); -int32_t taosGetBandSpeed(float *bandSpeedKb); +int32_t taosGetIOSpeed(double *readKB, double *writeKB, double *readDiskKB, double *writeDiskKB); +int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes); +int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec); int32_t taosSystem(const char *cmd); void taosKillSystem(); diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 0a3a39e830..ff9eb5d884 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -488,9 +488,9 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { pInfo->mem_total = tsTotalMemoryKB; pInfo->disk_engine = 0; pInfo->disk_used = tsDataSpace.size.used; - pInfo->disk_total = tsDataSpace.size.avail; - taosGetCardInfo(NULL, &pInfo->net_in, &pInfo->net_out); - taosGetProcIOSpeed(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); + pInfo->disk_total = tsDataSpace.size.total; + taosGetBandSpeed(&pInfo->net_in, &pInfo->net_out); + taosGetIOSpeed(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); pInfo->req_select = 0; pInfo->req_select_rate = 0; pInfo->req_insert = 0; diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 73b64b5319..8f2820f67a 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -87,7 +87,7 @@ int32_t taosGetCpuCores(float *numOfCores) { return 0; } -int32_t taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) { +int32_t taosGetCpuUsage(double *sysCpuUsage, double *procCpuUsage) { *sysCpuUsage = 0; *procCpuUsage = 0; return 0; @@ -112,15 +112,9 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { } } -int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) { - if (bytes) *bytes = 0; - if (rbytes) *rbytes = 0; - if (tbytes) *tbytes = 0; - return 0; -} - -int32_t taosGetBandSpeed(float *bandSpeedKb) { - *bandSpeedKb = 0; +int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { + *receive_bytes = 0; + *transmit_bytes = 0; return 0; } @@ -140,10 +134,10 @@ void taosGetSystemInfo() { taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); - float tmp1, tmp2, tmp3, tmp4; - taosGetBandSpeed(&tmp1); + double tmp1, tmp2, tmp3, tmp4; + taosGetBandSpeed(&tmp1, &tmp2); taosGetCpuUsage(&tmp1, &tmp2); - taosGetProcIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); + taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); } void taosKillSystem() { @@ -241,19 +235,13 @@ int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, in return 0; } -int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) { - if (bytes) *bytes = 0; - if (rbytes) *rbytes = 0; - if (tbytes) *tbytes = 0; +int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { + *receive_bytes = 0; + *transmit_bytes = 0; return 0; } -int32_t taosGetBandSpeed(float *bandSpeedKb) { - *bandSpeedKb = 0; - return 0; -} - -int32_t taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) { +int32_t taosGetCpuUsage(double *sysCpuUsage, double *procCpuUsage) { *sysCpuUsage = 0; *procCpuUsage = 0; return 0; @@ -370,7 +358,6 @@ int32_t taosGetSysMemory(int64_t *usedKB) { } int32_t taosGetProcMemory(int64_t *usedKB) { - // FILE *fp = fopen(tsProcMemFile, "r"); TdFilePtr pFile = taosOpenFile(tsProcMemFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { // printf("open file:%s failed", tsProcMemFile); @@ -404,7 +391,6 @@ int32_t taosGetProcMemory(int64_t *usedKB) { } static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) { - // FILE *fp = fopen(tsSysCpuFile, "r"); TdFilePtr pFile = taosOpenFile(tsSysCpuFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { // printf("open file:%s failed", tsSysCpuFile); @@ -429,7 +415,6 @@ static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) { } static int32_t taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) { - // FILE *fp = fopen(tsProcCpuFile, "r"); TdFilePtr pFile = taosOpenFile(tsProcCpuFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { // printf("open file:%s failed", tsProcCpuFile); @@ -463,7 +448,7 @@ int32_t taosGetCpuCores(float *numOfCores) { return 0; } -int32_t taosGetCpuUsage(float *cpu_system, float *cpu_engine) { +int32_t taosGetCpuUsage(double *cpu_system, double *cpu_engine) { static uint64_t lastSysUsed = 0; static uint64_t lastSysTotal = 0; static uint64_t lastProcTotal = 0; @@ -492,8 +477,8 @@ int32_t taosGetCpuUsage(float *cpu_system, float *cpu_engine) { return -1; } - *cpu_engine = (float)((double)(curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100); - *cpu_system = (float)((double)(curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100); + *cpu_engine = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100; + *cpu_system = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100; lastSysUsed = curSysUsed; lastSysTotal = curSysTotal; @@ -514,14 +499,9 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { } } -int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) { - if (bytes) *bytes = 0; - // FILE *fp = fopen(tsSysNetFile, "r"); +int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { TdFilePtr pFile = taosOpenFile(tsSysNetFile, TD_FILE_READ | TD_FILE_STREAM); - if (pFile == NULL) { - // printf("open file:%s failed", tsSysNetFile); - return -1; - } + if (pFile == NULL) return -1; ssize_t _bytes = 0; char *line = NULL; @@ -554,9 +534,8 @@ int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) { "%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, nouse0, &o_rbytes, &rpackts, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &o_tbytes, &tpackets); - if (rbytes) *rbytes = o_rbytes; - if (tbytes) *tbytes = o_tbytes; - if (bytes) *bytes += (o_rbytes + o_tbytes); + *receive_bytes = o_rbytes; + *transmit_bytes = o_tbytes; } if (line != NULL) tfree(line); @@ -565,57 +544,52 @@ int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) { return 0; } -int32_t taosGetBandSpeed(float *bandSpeedKb) { - static int64_t lastBytes = 0; - static time_t lastTime = 0; - int64_t curBytes = 0; - time_t curTime = time(NULL); +int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec) { + static int64_t last_receive_bytes = 0; + static int64_t last_transmit_bytes = 0; + static int64_t last_time = 0; + int64_t cur_receive_bytes = 0; + int64_t cur_transmit_bytes = 0; + int64_t cur_time = taosGetTimestampMs(); - if (taosGetCardInfo(&curBytes, NULL, NULL) != 0) { + if (taosGetCardInfo(&cur_receive_bytes, &cur_transmit_bytes) != 0) { return -1; } - if (lastTime == 0 || lastBytes == 0) { - lastTime = curTime; - lastBytes = curBytes; - *bandSpeedKb = 0; + if (last_time == 0 || last_time >= cur_time) { + last_time = cur_time; + last_receive_bytes = cur_receive_bytes; + last_transmit_bytes = cur_transmit_bytes; + *receive_bytes_per_sec = 0; + *transmit_bytes_per_sec = 0; return 0; } - if (lastTime >= curTime || lastBytes > curBytes) { - lastTime = curTime; - lastBytes = curBytes; - *bandSpeedKb = 0; - return 0; - } + *receive_bytes_per_sec = (cur_receive_bytes - last_receive_bytes) / (double)(cur_time - last_time) * 1000; + *transmit_bytes_per_sec = (cur_transmit_bytes - last_transmit_bytes) / (double)(cur_time - last_time) * 1000; - double totalBytes = (double)(curBytes - lastBytes) / 1024 * 8; // Kb - *bandSpeedKb = (float)(totalBytes / (double)(curTime - lastTime)); + last_time = cur_time; + last_transmit_bytes = cur_transmit_bytes; + last_receive_bytes = cur_receive_bytes; - // //printf("bandwidth lastBytes:%ld, lastTime:%ld, curBytes:%ld, curTime:%ld, - // speed:%f", lastBytes, lastTime, curBytes, curTime, *bandSpeed); - - lastTime = curTime; - lastBytes = curBytes; + if (*receive_bytes_per_sec < 0) *receive_bytes_per_sec = 0; + if (*transmit_bytes_per_sec < 0) *transmit_bytes_per_sec = 0; return 0; } int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { TdFilePtr pFile = taosOpenFile(tsProcIOFile, TD_FILE_READ | TD_FILE_STREAM); - if (pFile == NULL) { - // printf("open file:%s failed", tsProcIOFile); - return -1; - } + if (pFile == NULL) return -1; ssize_t _bytes = 0; char *line = NULL; - char tmp[10]; + char tmp[24]; int readIndex = 0; while (!taosEOFFile(pFile)) { _bytes = taosGetLineFile(pFile, &line); - if ((_bytes < 0) || (line == NULL)) { + if (_bytes < 10 || line == NULL) { break; } if (strstr(line, "rchar:") != NULL) { @@ -624,13 +598,13 @@ int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, in } else if (strstr(line, "wchar:") != NULL) { sscanf(line, "%s %" PRId64, tmp, wchars); readIndex++; - } if (strstr(line, "read_bytes:") != NULL) { + } else if (strstr(line, "read_bytes:") != NULL) { // read_bytes sscanf(line, "%s %" PRId64, tmp, read_bytes); readIndex++; - } else if (strstr(line, "write_bytes::") != NULL) { + } else if (strstr(line, "write_bytes:") != NULL) { // write_bytes sscanf(line, "%s %" PRId64, tmp, write_bytes); readIndex++; - }else { + } else { } if (readIndex >= 4) break; @@ -640,50 +614,54 @@ int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, in taosCloseFile(&pFile); if (readIndex < 4) { - // printf("read file:%s failed", tsProcIOFile); return -1; } return 0; } -int32_t taosGetProcIOSpeed(float *readKB, float *writeKB, float *readDiskKB, float *writeDiskKB) { - static int64_t lastRchar = -1; - static int64_t lastWchar = -1; - static int64_t lastRbyte = -1; - static int64_t lastWbyte = -1; +int32_t taosGetIOSpeed(double *rchar_per_sec, double *wchar_per_sec, double *read_bytes_per_sec, + double *write_bytes_per_sec) { + static int64_t last_rchar = -1; + static int64_t last_wchar = -1; + static int64_t last_read_bytes = -1; + static int64_t last_write_bytes = -1; + static int64_t last_time = 0; - int64_t curRchar = 0; - int64_t curWchar = 0; - int64_t curRbyte = 0; - int64_t curWbyte = 0; + int64_t cur_rchar = 0; + int64_t cur_wchar = 0; + int64_t cur_read_bytes = 0; + int64_t cur_write_bytes = 0; + int64_t cur_time = taosGetTimestampMs(); - if (taosReadProcIO(&curRchar, &curWchar, &curRbyte, &curWbyte) != 0) { + if (taosReadProcIO(&cur_rchar, &cur_wchar, &cur_read_bytes, &cur_write_bytes) != 0) { return -1; } - if (lastRchar == -1 || lastWchar == -1 || lastRbyte == -1 || lastWbyte == -1) { - lastRchar = curRchar; - lastWchar = curWchar; - lastRbyte = curRbyte; - lastWbyte = curWbyte; + if (last_time == 0 || last_time >= cur_time) { + last_time = cur_time; + last_rchar = cur_rchar; + last_wchar = cur_wchar; + last_read_bytes = cur_read_bytes; + last_write_bytes = cur_write_bytes; return -1; } - *readKB = (curRchar - lastRchar) / 1024.0f; - *writeKB = (curWchar - lastWchar) / 1024.0f; - *readDiskKB = (curRbyte - lastRbyte) / 1024.0f; - *writeDiskKB = (curWbyte - lastWbyte) / 1024.0f; + *rchar_per_sec = (cur_rchar - last_rchar) / (double)(cur_time - last_time) * 1000; + *wchar_per_sec = (cur_wchar - last_wchar) / (double)(cur_time - last_time) * 1000; + *read_bytes_per_sec = (cur_read_bytes - last_read_bytes) / (double)(cur_time - last_time) * 1000; + *write_bytes_per_sec = (cur_write_bytes - last_write_bytes) / (double)(cur_time - last_time) * 1000; - if (*readKB < 0) *readKB = 0; - if (*writeKB < 0) *writeKB = 0; - if (*readDiskKB < 0) *readDiskKB = 0; - if (*writeDiskKB < 0) *writeDiskKB = 0; + last_time = cur_time; + last_rchar = cur_rchar; + last_wchar = cur_wchar; + last_read_bytes = cur_read_bytes; + last_write_bytes = cur_write_bytes; - lastRchar = curRchar; - lastWchar = curWchar; - lastRbyte = curRbyte; - lastWbyte = curWbyte; + if (*rchar_per_sec < 0) *rchar_per_sec = 0; + if (*wchar_per_sec < 0) *wchar_per_sec = 0; + if (*read_bytes_per_sec < 0) *read_bytes_per_sec = 0; + if (*write_bytes_per_sec < 0) *write_bytes_per_sec = 0; return 0; } @@ -693,10 +671,10 @@ void taosGetSystemInfo() { taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); - float tmp1, tmp2, tmp3, tmp4; - taosGetBandSpeed(&tmp1); + double tmp1, tmp2, tmp3, tmp4; + taosGetBandSpeed(&tmp1, &tmp2); taosGetCpuUsage(&tmp1, &tmp2); - taosGetProcIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); + taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); } void taosKillSystem() { From b75da82c964dcc1c864507a2abc07240aff03f6f Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 5 Mar 2022 12:28:34 +0800 Subject: [PATCH 6/7] sync timeout --- source/libs/sync/inc/syncIO.h | 6 +-- source/libs/sync/inc/syncMessage.h | 20 ++++++-- source/libs/sync/src/syncIO.c | 9 +++- source/libs/sync/src/syncMain.c | 53 ++++++++++++++++++-- source/libs/sync/src/syncMessage.c | 67 +++++++++++++++++++++++++- source/libs/sync/test/syncPingTest.cpp | 1 + 6 files changed, 144 insertions(+), 12 deletions(-) diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index a948de8ac1..160fefd086 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -31,11 +31,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset *pQset; + STaosQset * pQset; pthread_t consumerTid; - void *serverRpc; - void *clientRpc; + void * serverRpc; + void * clientRpc; SEpSet myAddr; void *ioTimerTickQ; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index a2e745b3d9..b022044528 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -30,7 +30,8 @@ extern "C" { // encode as uint32 typedef enum ESyncMessageType { - SYNC_UNKNOWN = 99, + SYNC_UNKNOWN = 77, + SYNC_TIMEOUT = 99, SYNC_PING = 101, SYNC_PING_REPLY = 103, SYNC_CLIENT_REQUEST = 105, @@ -39,7 +40,7 @@ typedef enum ESyncMessageType { SYNC_REQUEST_VOTE_REPLY = 111, SYNC_APPEND_ENTRIES = 113, SYNC_APPEND_ENTRIES_REPLY = 115, - SYNC_TIMEOUT = 117, + } ESyncMessageType; // --------------------------------------------- @@ -48,17 +49,28 @@ cJSON* syncRpcUnknownMsg2Json(); // --------------------------------------------- typedef enum ESyncTimeoutType { - SYNC_TIMEOUT_PING = 0, + SYNC_TIMEOUT_PING = 100, SYNC_TIMEOUT_ELECTION, SYNC_TIMEOUT_HEARTBEAT, } ESyncTimeoutType; typedef struct SyncTimeout { - ESyncTimeoutType type; + uint32_t bytes; + uint32_t msgType; + ESyncTimeoutType timeoutType; void* data; } SyncTimeout; +SyncTimeout* syncTimeoutBuild(); +void syncTimeoutDestroy(SyncTimeout* pMsg); +void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen); +void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg); +void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); +void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); +cJSON* syncTimeout2Json(const SyncTimeout* pMsg); +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data); + // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index ffd982f233..55c4d57201 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -246,7 +246,14 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { - } else { + if (io->FpOnSyncTimeout != NULL) { + SyncTimeout *pSyncMsg; + pSyncMsg = syncTimeoutBuild(); + syncTimeoutFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + } + } else { ; } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 57bc754735..5f3a276a68 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -27,6 +27,10 @@ static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNo static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); static void syncNodePingTimerCb(void* param, void* tmrId); +static void syncNodeEqPingTimer(void* param, void* tmrId); +static void syncNodeEqElectTimer(void* param, void* tmrId); +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); + static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); @@ -95,7 +99,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pPingTimer = NULL; pSyncNode->pingTimerMS = 1000; atomic_store_8(&pSyncNode->pingTimerEnable, 0); - pSyncNode->FpPingTimer = syncNodePingTimerCb; + pSyncNode->FpPingTimer = syncNodeEqPingTimer; pSyncNode->pingTimerCounter = 0; pSyncNode->FpOnPing = syncNodeOnPingCb; @@ -104,6 +108,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; + pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb; return pSyncNode; } @@ -329,6 +334,27 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t ret = 0; + sTrace("<-- syncNodeOnTimeoutCb -->"); + + { + cJSON* pJson = syncTimeout2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { + if (atomic_load_8(&ths->pingTimerEnable)) { + ++(ths->pingTimerCounter); + syncNodePingAll(ths); + } + + } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { + } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { + } else { + } + return ret; } @@ -336,7 +362,6 @@ static void syncNodePingTimerCb(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_8(&pSyncNode->pingTimerEnable)) { ++(pSyncNode->pingTimerCounter); - // pSyncNode->pingTimerMS += 100; sTrace( "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, " @@ -350,4 +375,26 @@ static void syncNodePingTimerCb(void* param, void* tmrId) { } else { sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); } -} \ No newline at end of file +} + +static void syncNodeEqPingTimer(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + if (atomic_load_8(&pSyncNode->pingTimerEnable)) { + // pSyncNode->pingTimerMS += 100; + + SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode); + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncTimeoutDestroy(pSyncMsg); + + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + } else { + sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); + } +} + +static void syncNodeEqElectTimer(void* param, void* tmrId) {} + +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 4dbae9bcba..f1434947a1 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -24,7 +24,12 @@ void onMessage(SRaft* pRaft, void* pMsg) {} cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* pRoot; - if (pRpcMsg->msgType == SYNC_PING) { + // in compiler optimization, switch case = if else constants + if (pRpcMsg->msgType == SYNC_TIMEOUT) { + SyncTimeout* pSyncMsg = (SyncTimeout*)pRpcMsg->pCont; + pRoot = syncTimeout2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_PING) { SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont; pRoot = syncPing2Json(pSyncMsg); @@ -73,6 +78,66 @@ cJSON* syncRpcUnknownMsg2Json() { return pJson; } +// ---- message process SyncTimeout---- +SyncTimeout* syncTimeoutBuild() { + uint32_t bytes = sizeof(SyncTimeout); + SyncTimeout* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_TIMEOUT; + return pMsg; +} + +void syncTimeoutDestroy(SyncTimeout* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncTimeoutSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) { + syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); + snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); + cJSON_AddStringToObject(pRoot, "data", u64buf); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot); + return pJson; +} + +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data) { + SyncTimeout* pMsg = syncTimeoutBuild(); + pMsg->timeoutType = timeoutType; + pMsg->data = data; + return pMsg; +} + // ---- message process SyncPing---- SyncPing* syncPingBuild(uint32_t dataLen) { uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 06ba5ba6ce..77413a713b 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -78,6 +78,7 @@ int main(int argc, char** argv) { SSyncNode* pSyncNode = doSync(myIndex); gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); From b7e22154b281f36035e702c97fe868bf3024187f Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 5 Mar 2022 15:03:49 +0800 Subject: [PATCH 7/7] sync timeout --- source/libs/sync/inc/syncInt.h | 9 ++-- source/libs/sync/inc/syncUtil.h | 4 +- source/libs/sync/inc/syncVoteMgr.h | 27 +++++++++- source/libs/sync/src/syncMain.c | 20 -------- source/libs/sync/src/syncUtil.c | 7 ++- source/libs/sync/src/syncVoteMgr.c | 80 ++++++++++++++++++++++++++++++ 6 files changed, 118 insertions(+), 29 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d67b419b24..2b62f9b8b3 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -121,14 +121,17 @@ typedef struct SSyncNode { // init internal SNodeInfo me; + SRaftId raftId; + int32_t peersNum; SNodeInfo peers[TSDB_MAX_REPLICA]; + SRaftId peersId[TSDB_MAX_REPLICA]; + + int32_t replicaNum; + SRaftId replicasId[TSDB_MAX_REPLICA]; // raft algorithm SSyncFSM* pFsm; - SRaftId raftId; - SRaftId peersId[TSDB_MAX_REPLICA]; - int32_t replicaNum; int32_t quorum; // life cycle diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 93d2c12525..e1078d5738 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -39,8 +39,9 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); +bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); + // ---- SSyncBuffer ---- -#if 0 void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len); void syncUtilbufDestroy(SSyncBuffer* syncBuf); @@ -48,7 +49,6 @@ void syncUtilbufDestroy(SSyncBuffer* syncBuf); void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest); void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); -#endif #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index b841f2e316..e2307e9e66 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -24,13 +24,36 @@ extern "C" { #include #include #include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" #include "taosdef.h" typedef struct SVotesGranted { + SyncTerm term; + int32_t quorum; + int32_t votes; + bool toLeader; + SSyncNode *pSyncNode; } SVotesGranted; -typedef struct SVotesResponded { -} SVotesResponded; +SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode); +void voteGrantedDestroy(SVotesGranted *pVotesGranted); +bool voteGrantedMajority(SVotesGranted *pVotesGranted); +void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); +void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); + +typedef struct SVotesRespond { + SRaftId (*replicas)[TSDB_MAX_REPLICA]; + bool isRespond[TSDB_MAX_REPLICA]; + int32_t replicaNum; + SyncTerm term; + SSyncNode *pSyncNode; +} SVotesRespond; + +SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode); +bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); +void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); +void Reset(SVotesRespond *pVotesRespond, SyncTerm term); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5f3a276a68..6a0663dd57 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -25,7 +25,6 @@ static int32_t tsNodeRefId = -1; // ------ local funciton --------- static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); -static void syncNodePingTimerCb(void* param, void* tmrId); static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); @@ -358,25 +357,6 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { return ret; } -static void syncNodePingTimerCb(void* param, void* tmrId) { - SSyncNode* pSyncNode = (SSyncNode*)param; - if (atomic_load_8(&pSyncNode->pingTimerEnable)) { - ++(pSyncNode->pingTimerCounter); - - sTrace( - "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, " - "tmrId:%p ", - pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId); - - syncNodePingAll(pSyncNode); - - taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, - &pSyncNode->pPingTimer); - } else { - sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); - } -} - static void syncNodeEqPingTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_8(&pSyncNode->pingTimerEnable)) { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index b4959a810b..7b4d6ee366 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -68,8 +68,12 @@ void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaft raftId->vgId = vgId; } +bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) { + bool ret = pId1->addr == pId2->addr && pId1->vgId == pId2->vgId; + return ret; +} + // ---- SSyncBuffer ----- -#if 0 void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) { syncBuf->len = len; syncBuf->data = malloc(syncBuf->len); @@ -87,4 +91,3 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { dest->data = malloc(dest->len); memcpy(dest->data, src->data, dest->len); } -#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 02cf4ac033..c9f0ceab57 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -14,3 +14,83 @@ */ #include "syncVoteMgr.h" + +SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { + SVotesGranted *pVotesGranted = malloc(sizeof(SVotesGranted)); + assert(pVotesGranted != NULL); + memset(pVotesGranted, 0, sizeof(SVotesGranted)); + + pVotesGranted->quorum = pSyncNode->quorum; + pVotesGranted->term = 0; + pVotesGranted->votes = 0; + pVotesGranted->toLeader = false; + pVotesGranted->pSyncNode = pSyncNode; + + return pVotesGranted; +} + +void voteGrantedDestroy(SVotesGranted *pVotesGranted) { + if (pVotesGranted != NULL) { + free(pVotesGranted); + } +} + +bool voteGrantedMajority(SVotesGranted *pVotesGranted) { + bool ret = pVotesGranted->votes >= pVotesGranted->quorum; + return ret; +} + +void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { + assert(pMsg->voteGranted == true); + assert(pMsg->term == pVotesGranted->term); + pVotesGranted->votes++; +} + +void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) { + pVotesGranted->term = term; + pVotesGranted->votes = 0; + pVotesGranted->toLeader = false; +} + +SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { + SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond)); + assert(pVotesRespond != NULL); + memset(pVotesRespond, 0, sizeof(SVotesRespond)); + + pVotesRespond->replicas = &(pSyncNode->replicasId); + pVotesRespond->replicaNum = pSyncNode->replicaNum; + pVotesRespond->term = 0; + pVotesRespond->pSyncNode = pSyncNode; + + return pVotesRespond; +} + +bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { + bool ret = false; + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) { + ret = true; + break; + } + } + return ret; +} + +void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) { + assert(pVotesRespond->term == pMsg->term); + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + if (syncUtilSameId(&(*pVotesRespond->replicas)[i], &pMsg->srcId)) { + assert(pVotesRespond->isRespond[i] == false); + pVotesRespond->isRespond[i] = true; + return; + } + } + assert(0); +} + +void Reset(SVotesRespond *pVotesRespond, SyncTerm term) { + pVotesRespond->term = term; + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + pVotesRespond->isRespond[i] = false; + } +} \ No newline at end of file