TD-1382
This commit is contained in:
parent
fa5d57a49a
commit
59711f9d98
|
@ -16,11 +16,13 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "mnode.h"
|
#include "mnode.h"
|
||||||
#include "http.h"
|
#include "http.h"
|
||||||
#include "tmqtt.h"
|
#include "tmqtt.h"
|
||||||
#include "monitor.h"
|
#include "monitor.h"
|
||||||
|
#include "dnode.h"
|
||||||
#include "dnodeInt.h"
|
#include "dnodeInt.h"
|
||||||
#include "dnodeModule.h"
|
#include "dnodeModule.h"
|
||||||
|
|
||||||
|
@ -129,17 +131,32 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
|
||||||
for (int32_t module = TSDB_MOD_MNODE; module < TSDB_MOD_HTTP; ++module) {
|
for (int32_t module = TSDB_MOD_MNODE; module < TSDB_MOD_HTTP; ++module) {
|
||||||
bool enableModule = moduleStatus & (1 << module);
|
bool enableModule = moduleStatus & (1 << module);
|
||||||
if (!tsModule[module].enable && enableModule) {
|
if (!tsModule[module].enable && enableModule) {
|
||||||
dInfo("module status:%u is received, start %s module", tsModuleStatus, tsModule[module].name);
|
dInfo("module status:%u is set, start %s module", moduleStatus, tsModule[module].name);
|
||||||
tsModule[module].enable = true;
|
tsModule[module].enable = true;
|
||||||
dnodeSetModuleStatus(module);
|
dnodeSetModuleStatus(module);
|
||||||
(*tsModule[module].startFp)();
|
(*tsModule[module].startFp)();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsModule[module].enable && !enableModule) {
|
if (tsModule[module].enable && !enableModule) {
|
||||||
dInfo("module status:%u is received, stop %s module", tsModuleStatus, tsModule[module].name);
|
dInfo("module status:%u is set, stop %s module", moduleStatus, tsModule[module].name);
|
||||||
tsModule[module].enable = false;
|
tsModule[module].enable = false;
|
||||||
dnodeUnSetModuleStatus(module);
|
dnodeUnSetModuleStatus(module);
|
||||||
(*tsModule[module].stopFp)();
|
(*tsModule[module].stopFp)();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dnodeCheckModules() {
|
||||||
|
if (tsModuleStatus & TSDB_MOD_MNODE) return;
|
||||||
|
|
||||||
|
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
|
||||||
|
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
|
||||||
|
SDMMnodeInfo *node = &mnodes->nodeInfos[i];
|
||||||
|
if (node->nodeId == dnodeGetDnodeId()) {
|
||||||
|
uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE);;
|
||||||
|
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
|
||||||
|
dnodeProcessModuleStatus(moduleStatus);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ void dnodeGetMnodeEpSetForPeer(void *epSet);
|
||||||
void dnodeGetMnodeEpSetForShell(void *epSet);
|
void dnodeGetMnodeEpSetForShell(void *epSet);
|
||||||
void * dnodeGetMnodeInfos();
|
void * dnodeGetMnodeInfos();
|
||||||
int32_t dnodeGetDnodeId();
|
int32_t dnodeGetDnodeId();
|
||||||
|
void dnodeCheckModules();
|
||||||
|
|
||||||
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
|
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
|
||||||
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
|
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
#include "twal.h"
|
#include "twal.h"
|
||||||
#include "tsync.h"
|
#include "tsync.h"
|
||||||
|
#include "ttimer.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "dnode.h"
|
#include "dnode.h"
|
||||||
#include "mnode.h"
|
#include "mnode.h"
|
||||||
|
@ -88,6 +89,8 @@ typedef struct {
|
||||||
SSdbWriteWorker *writeWorker;
|
SSdbWriteWorker *writeWorker;
|
||||||
} SSdbWriteWorkerPool;
|
} SSdbWriteWorkerPool;
|
||||||
|
|
||||||
|
extern void * tsMnodeTmr;
|
||||||
|
static void * tsUpdateSyncTmr;
|
||||||
static SSdbObject tsSdbObj = {0};
|
static SSdbObject tsSdbObj = {0};
|
||||||
static taos_qset tsSdbWriteQset;
|
static taos_qset tsSdbWriteQset;
|
||||||
static taos_qall tsSdbWriteQall;
|
static taos_qall tsSdbWriteQall;
|
||||||
|
@ -290,11 +293,16 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
||||||
taosFreeQitem(pOper);
|
taosFreeQitem(pOper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(); }
|
||||||
|
|
||||||
void sdbUpdateSync() {
|
void sdbUpdateSync() {
|
||||||
if (!mnodeIsRunning()) {
|
if (!mnodeIsRunning()) {
|
||||||
mDebug("mnode not start yet, update sync info later");
|
mDebug("mnode not start yet, update sync info later");
|
||||||
|
dnodeCheckModules();
|
||||||
|
taosTmrReset(sdbUpdateSyncTmrFp, 1000, NULL, tsMnodeTmr, &tsUpdateSyncTmr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
mDebug("update sync info in sdb");
|
||||||
|
|
||||||
SSyncCfg syncCfg = {0};
|
SSyncCfg syncCfg = {0};
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
|
|
|
@ -512,7 +512,9 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
|
||||||
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
|
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
|
||||||
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
|
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
|
||||||
sDebug("%s, start to check peer connection", pPeer->id);
|
sDebug("%s, start to check peer connection", pPeer->id);
|
||||||
taosTmrReset(syncCheckPeerConnection, 100 + (pNode->vgId * 10) % 100, pPeer, syncTmrCtrl, &pPeer->timer);
|
int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
|
||||||
|
if (pNode->vgId) checkMs = tsStatusInterval * 3000 + 100;
|
||||||
|
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncAddNodeRef(pNode);
|
syncAddNodeRef(pNode);
|
||||||
|
|
Loading…
Reference in New Issue