From 59711f9d988ae76d95c6c62e39a6dbd42fd85f9e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 10 Sep 2020 13:49:53 +0000 Subject: [PATCH] TD-1382 --- src/dnode/src/dnodeModule.c | 21 +++++++++++++++++++-- src/inc/dnode.h | 1 + src/mnode/src/mnodeSdb.c | 8 ++++++++ src/sync/src/syncMain.c | 4 +++- 4 files changed, 31 insertions(+), 3 deletions(-) diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index e1d298089c..bcbc98b5b3 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -16,11 +16,13 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taosdef.h" +#include "taosmsg.h" #include "tglobal.h" #include "mnode.h" #include "http.h" #include "tmqtt.h" #include "monitor.h" +#include "dnode.h" #include "dnodeInt.h" #include "dnodeModule.h" @@ -129,17 +131,32 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { for (int32_t module = TSDB_MOD_MNODE; module < TSDB_MOD_HTTP; ++module) { bool enableModule = moduleStatus & (1 << module); if (!tsModule[module].enable && enableModule) { - dInfo("module status:%u is received, start %s module", tsModuleStatus, tsModule[module].name); + dInfo("module status:%u is set, start %s module", moduleStatus, tsModule[module].name); tsModule[module].enable = true; dnodeSetModuleStatus(module); (*tsModule[module].startFp)(); } if (tsModule[module].enable && !enableModule) { - dInfo("module status:%u is received, stop %s module", tsModuleStatus, tsModule[module].name); + dInfo("module status:%u is set, stop %s module", moduleStatus, tsModule[module].name); tsModule[module].enable = false; dnodeUnSetModuleStatus(module); (*tsModule[module].stopFp)(); } } } + +void dnodeCheckModules() { + if (tsModuleStatus & TSDB_MOD_MNODE) return; + + SDMMnodeInfos *mnodes = dnodeGetMnodeInfos(); + for (int32_t i = 0; i < mnodes->nodeNum; ++i) { + SDMMnodeInfo *node = &mnodes->nodeInfos[i]; + if (node->nodeId == dnodeGetDnodeId()) { + uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE);; + dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus); + dnodeProcessModuleStatus(moduleStatus); + break; + } + } +} diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 093ce93205..028041b2d2 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -43,6 +43,7 @@ void dnodeGetMnodeEpSetForPeer(void *epSet); void dnodeGetMnodeEpSetForShell(void *epSet); void * dnodeGetMnodeInfos(); int32_t dnodeGetDnodeId(); +void dnodeCheckModules(); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 8928a6622d..8e99c65955 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -22,6 +22,7 @@ #include "tqueue.h" #include "twal.h" #include "tsync.h" +#include "ttimer.h" #include "tglobal.h" #include "dnode.h" #include "mnode.h" @@ -88,6 +89,8 @@ typedef struct { SSdbWriteWorker *writeWorker; } SSdbWriteWorkerPool; +extern void * tsMnodeTmr; +static void * tsUpdateSyncTmr; static SSdbObject tsSdbObj = {0}; static taos_qset tsSdbWriteQset; static taos_qall tsSdbWriteQall; @@ -290,11 +293,16 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { taosFreeQitem(pOper); } +static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(); } + void sdbUpdateSync() { if (!mnodeIsRunning()) { mDebug("mnode not start yet, update sync info later"); + dnodeCheckModules(); + taosTmrReset(sdbUpdateSyncTmrFp, 1000, NULL, tsMnodeTmr, &tsUpdateSyncTmr); return; } + mDebug("update sync info in sdb"); SSyncCfg syncCfg = {0}; int32_t index = 0; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 861c3366f6..314a6fcd74 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -512,7 +512,9 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { int ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { sDebug("%s, start to check peer connection", pPeer->id); - taosTmrReset(syncCheckPeerConnection, 100 + (pNode->vgId * 10) % 100, pPeer, syncTmrCtrl, &pPeer->timer); + int32_t checkMs = 100 + (pNode->vgId * 10) % 100; + if (pNode->vgId) checkMs = tsStatusInterval * 3000 + 100; + taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); } syncAddNodeRef(pNode);