monitor
This commit is contained in:
parent
10fd86209d
commit
4cf8c0acf4
|
@ -52,6 +52,12 @@ extern bool tsEnableSlaveQuery;
|
||||||
extern bool tsPrintAuth;
|
extern bool tsPrintAuth;
|
||||||
extern int64_t tsTickPerDay[3];
|
extern int64_t tsTickPerDay[3];
|
||||||
|
|
||||||
|
// monitor
|
||||||
|
extern bool tsEnableMonitor;
|
||||||
|
extern int32_t tsMonitorInterval;
|
||||||
|
extern char tsMonitorFqdn[];
|
||||||
|
extern uint16_t tsMonitorPort;
|
||||||
|
|
||||||
// query buffer management
|
// query buffer management
|
||||||
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
|
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
|
||||||
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
|
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
|
||||||
|
|
|
@ -46,6 +46,12 @@ int32_t tsMaxBinaryDisplayWidth = 30;
|
||||||
bool tsEnableSlaveQuery = 1;
|
bool tsEnableSlaveQuery = 1;
|
||||||
bool tsPrintAuth = 0;
|
bool tsPrintAuth = 0;
|
||||||
|
|
||||||
|
// monitor
|
||||||
|
bool tsEnableMonitor = 1;
|
||||||
|
int32_t tsMonitorInterval = 5;
|
||||||
|
char tsMonitorFqdn[TSDB_FQDN_LEN] = {0};
|
||||||
|
uint16_t tsMonitorPort = 6043;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
||||||
* metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server.
|
* metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server.
|
||||||
|
@ -314,6 +320,12 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
|
||||||
|
|
||||||
|
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1;
|
||||||
|
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, 0) != 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,7 +357,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosSetClientCfg(SConfig *pCfg) {
|
static void taosSetClientCfg(SConfig *pCfg) {
|
||||||
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_EP_LEN);
|
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||||
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||||
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
|
@ -425,6 +437,11 @@ static void taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
|
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
|
||||||
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval;
|
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval;
|
||||||
|
|
||||||
|
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
||||||
|
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
||||||
|
tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN);
|
||||||
|
tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32;
|
||||||
|
|
||||||
if (tsQueryBufferSize >= 0) {
|
if (tsQueryBufferSize >= 0) {
|
||||||
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
|
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,7 +140,7 @@ static int32_t dndInitDir(SDnode *pDnode, SDnodeObjCfg *pCfg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndCloseImp(SDnode *pDnode) {
|
static void dndCloseDir(SDnode *pDnode) {
|
||||||
tfree(pDnode->dir.mnode);
|
tfree(pDnode->dir.mnode);
|
||||||
tfree(pDnode->dir.vnodes);
|
tfree(pDnode->dir.vnodes);
|
||||||
tfree(pDnode->dir.dnode);
|
tfree(pDnode->dir.dnode);
|
||||||
|
@ -260,7 +260,7 @@ void dndClose(SDnode *pDnode) {
|
||||||
dndCleanupMgmt(pDnode);
|
dndCleanupMgmt(pDnode);
|
||||||
tfsClose(pDnode->pTfs);
|
tfsClose(pDnode->pTfs);
|
||||||
|
|
||||||
dndCloseImp(pDnode);
|
dndCloseDir(pDnode);
|
||||||
free(pDnode);
|
free(pDnode);
|
||||||
dInfo("dnode object is closed, data:%p", pDnode);
|
dInfo("dnode object is closed, data:%p", pDnode);
|
||||||
}
|
}
|
||||||
|
@ -288,9 +288,8 @@ int32_t dndInit() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVnodeOpt vnodeOpt = {.nthreads = tsNumOfCommitThreads,
|
SVnodeOpt vnodeOpt = {
|
||||||
.putReqToVQueryQFp = dndPutReqToVQueryQ,
|
.nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp = dndSendReqToDnode};
|
||||||
.sendReqToDnodeFp = dndSendReqToDnode};
|
|
||||||
|
|
||||||
if (vnodeInit(&vnodeOpt) != 0) {
|
if (vnodeInit(&vnodeOpt) != 0) {
|
||||||
dError("failed to init vnode since %s", terrstr());
|
dError("failed to init vnode since %s", terrstr());
|
||||||
|
|
|
@ -473,19 +473,39 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dndSendMonitorReport(SDnode *pDnode) {
|
||||||
|
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0) return;
|
||||||
|
|
||||||
|
dTrace("pDnode:%p, send monitor report to %s:%u", pDnode, tsMonitorFqdn, tsMonitorPort);
|
||||||
|
}
|
||||||
|
|
||||||
static void *dnodeThreadRoutine(void *param) {
|
static void *dnodeThreadRoutine(void *param) {
|
||||||
SDnode *pDnode = param;
|
SDnode *pDnode = param;
|
||||||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||||
int32_t ms = tsStatusInterval * 1000;
|
int64_t lastStatusTime = taosGetTimestampMs();
|
||||||
|
int64_t lastMonitorTime = lastStatusTime;
|
||||||
|
|
||||||
setThreadName("dnode-hb");
|
setThreadName("dnode-hb");
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
pthread_testcancel();
|
pthread_testcancel();
|
||||||
taosMsleep(ms);
|
taosMsleep(200);
|
||||||
|
if (dndGetStat(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent && !pMgmt->dropped) {
|
int64_t curTime = taosGetTimestampMs();
|
||||||
|
|
||||||
|
float statusInterval = (curTime - lastStatusTime) / 1000.0f;
|
||||||
|
if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
|
||||||
dndSendStatusReq(pDnode);
|
dndSendStatusReq(pDnode);
|
||||||
|
lastStatusTime = curTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
float monitorInterval = (curTime - lastMonitorTime) / 1000.0f;
|
||||||
|
if (monitorInterval >= tsMonitorInterval) {
|
||||||
|
dndSendMonitorReport(pDnode);
|
||||||
|
lastMonitorTime = curTime;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue