From ee43a70c4dde610857508312228306d1c5260928 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 2 Mar 2022 10:43:59 +0800 Subject: [PATCH] sync modify timer --- source/libs/sync/inc/syncInt.h | 30 ++++++++-------- source/libs/sync/inc/syncMessage.h | 4 +++ source/libs/sync/src/syncIO.c | 11 +++--- source/libs/sync/src/syncMain.c | 49 ++++++++++++++++++-------- source/libs/sync/src/syncMessage.c | 14 ++++++++ source/libs/sync/test/syncPingTest.cpp | 4 ++- 6 files changed, 76 insertions(+), 36 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d75c5424ea..0901330488 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -48,23 +48,23 @@ extern int32_t sDebugFlag; taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); \ } \ } -#define sInfo(...) \ - { \ - if (sDebugFlag & DEBUG_INFO) { \ - taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \ - } \ +#define sInfo(...) \ + { \ + if (sDebugFlag & DEBUG_INFO) { \ + taosPrintLog("SYN INFO ", sDebugFlag, __VA_ARGS__); \ + } \ } -#define sDebug(...) \ - { \ - if (sDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \ - } \ +#define sDebug(...) \ + { \ + if (sDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("SYN DEBUG ", sDebugFlag, __VA_ARGS__); \ + } \ } -#define sTrace(...) \ - { \ - if (sDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \ - } \ +#define sTrace(...) \ + { \ + if (sDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("SYN TRACE ", sDebugFlag, __VA_ARGS__); \ + } \ } struct SRaft; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index a7de7b9019..603ef00517 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -72,6 +72,10 @@ void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); cJSON* syncPing2Json(const SyncPing* pMsg); +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); + +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId); + typedef struct SyncPingReply { uint32_t bytes; uint32_t msgType; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index c035ad5d6b..27434a4029 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -38,6 +38,7 @@ static void syncIOTick(void *param, void *tmrId); // ---------------------------- int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { + sTrace("syncIOSendMsg ... "); pMsg->handle = NULL; rpcSendRequest(handle, pEpSet, pMsg, NULL); return 0; @@ -74,7 +75,7 @@ static void syncIOTick(void *param, void *tmrId) { taosWriteQitem(io->pMsgQ, pTemp); - taosTmrReset(syncIOTick, 1000, io, io->syncTimerManager, io->syncTimer); + taosTmrReset(syncIOTick, 1000, io, io->syncTimerManager, &io->syncTimer); } static void *syncIOConsumer(void *param) { @@ -191,7 +192,7 @@ static int32_t doSyncIOStart(SSyncIO *io) { { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 38000; + rpcInit.localPort = 7010; rpcInit.label = "SYNC-IO-SERVER"; rpcInit.numOfThreads = 1; rpcInit.cfp = syncIODoRequest; @@ -209,7 +210,7 @@ static int32_t doSyncIOStart(SSyncIO *io) { } io->epSet.inUse = 0; - addEpIntoEpSet(&io->epSet, "127.0.0.1", 38000); + addEpIntoEpSet(&io->epSet, "127.0.0.1", 7010); // start consumer thread { @@ -221,8 +222,8 @@ static int32_t doSyncIOStart(SSyncIO *io) { } // start tmr thread - // io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); - // io->syncTimer = taosTmrStart(syncIOTick, 1000, io, io->syncTimerManager); + io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); + io->syncTimer = taosTmrStart(syncIOTick, 1000, io, io->syncTimerManager); return 0; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0aa3e56062..9e335c5e55 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -113,37 +113,36 @@ void syncNodeClose(SSyncNode* pSyncNode) { } void syncNodePingAll(SSyncNode* pSyncNode) { - sTrace("syncNodePingAll %p ", pSyncNode); + sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode); int32_t ret = 0; for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) { - SyncPing* pMsg = syncPingBuild(strlen("ping") + 1); - memcpy(pMsg->data, "ping", strlen("ping") + 1); - syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &pMsg->destId); - pMsg->srcId = pSyncNode->raftId; - ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); + SRaftId destId; + syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId); + SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId); + ret = syncNodePing(pSyncNode, &destId, pMsg); assert(ret == 0); + syncPingDestroy(pMsg); } } void syncNodePingPeers(SSyncNode* pSyncNode) { int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { - SyncPing* pSyncPing; - SRaftId raftId; - syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &raftId); - ret = syncNodePing(pSyncNode, &raftId, pSyncPing); + SRaftId destId; + syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId); + SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId); + ret = syncNodePing(pSyncNode, &destId, pMsg); assert(ret == 0); + syncPingDestroy(pMsg); } } void syncNodePingSelf(SSyncNode* pSyncNode) { - int32_t ret = 0; - SyncPing* pMsg = syncPingBuild(strlen("ping") + 1); - memcpy(pMsg->data, "ping", strlen("ping") + 1); - pMsg->destId = pSyncNode->raftId; - pMsg->srcId = pSyncNode->raftId; + int32_t ret; + SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId); ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); assert(ret == 0); + syncPingDestroy(pMsg); } int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { @@ -167,10 +166,29 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { // ------ local funciton --------- static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { + sTrace("syncNodePing pSyncNode:%p ", pSyncNode); int32_t ret = 0; SRpcMsg rpcMsg; syncPing2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + + { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodePing pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + { + SyncPing* pMsg2 = rpcMsg.pCont; + cJSON* pJson = syncPing2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodePing pMsg2:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + return ret; } @@ -185,6 +203,7 @@ static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pM } static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode); SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 3122aa66bb..e0975fb3c1 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -80,6 +80,20 @@ cJSON* syncPing2Json(const SyncPing* pMsg) { return pJson; } +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) { + uint32_t dataLen = strlen(str) + 1; + SyncPing* pMsg = syncPingBuild(dataLen); + pMsg->srcId = *srcId; + pMsg->destId = *destId; + snprintf(pMsg->data, pMsg->dataLen, "%s", str); + return pMsg; +} + +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId) { + SyncPing* pMsg = syncPingBuild2(srcId, destId, "ping"); + return pMsg; +} + // ---- message process SyncPingReply---- SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { uint32_t bytes = SYNC_PING_REPLY_FIX_LEN + dataLen; diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 43196bdd1f..b57f6c5259 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -25,7 +25,7 @@ SSyncNode* doSync() { SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = 0; - pCfg->replicaNum = 3; + pCfg->replicaNum = 1; pCfg->nodeInfo[0].nodePort = 7010; taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); @@ -63,6 +63,7 @@ int main() { ret = syncEnvStart(); assert(ret == 0); +/* SSyncNode* pSyncNode = doSync(); ret = syncNodeStartPingTimer(pSyncNode); @@ -72,6 +73,7 @@ int main() { ret = syncNodeStopPingTimer(pSyncNode); assert(ret == 0); +*/ while (1) { taosMsleep(1000);