monitor client
This commit is contained in:
parent
68e2e4df74
commit
0a16b75831
|
@ -147,6 +147,7 @@ extern int32_t tsMetaCacheMaxSize;
|
||||||
extern int32_t tsSlowLogThreshold;
|
extern int32_t tsSlowLogThreshold;
|
||||||
extern int32_t tsSlowLogScope;
|
extern int32_t tsSlowLogScope;
|
||||||
extern int32_t tsTimeSeriesThreshold;
|
extern int32_t tsTimeSeriesThreshold;
|
||||||
|
extern bool enableSlowQueryMonitor;
|
||||||
|
|
||||||
// client
|
// client
|
||||||
extern int32_t tsMinSlidingTime;
|
extern int32_t tsMinSlidingTime;
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_CLIENT_MONITOR_H
|
||||||
|
#define TDENGINE_CLIENT_MONITOR_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "taos_monitor.h"
|
||||||
|
#include "thash.h"
|
||||||
|
#include "query.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char clusterKey[512];
|
||||||
|
SEpSet epSet;
|
||||||
|
void* pTransporter;
|
||||||
|
taos_collector_registry_t* registry;
|
||||||
|
taos_collector_t* colector;
|
||||||
|
taos_counter_t* slow_query_counter;
|
||||||
|
taos_counter_t* select_counter;
|
||||||
|
SHashObj* counters;
|
||||||
|
} ClientMonitor;
|
||||||
|
|
||||||
|
void clusterMonitorInit(const char* clusterKey, SEpSet epSet, void* pTransporter);
|
||||||
|
void clusterMonitorClose(const char* clusterKey);
|
||||||
|
taos_counter_t* createClusterCounter(const char* clusterKey, const char* name, const char* help, size_t label_key_count,
|
||||||
|
const char** label_keys);
|
||||||
|
int taosClusterCounterInc(const char* clusterKey, const char* counterName, const char** label_values);
|
||||||
|
|
||||||
|
void cluster_monitor_stop();
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // TDENGINE_CLIENT_MONITOR_H
|
|
@ -20,7 +20,7 @@ target_include_directories(
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
taos
|
taos
|
||||||
INTERFACE api
|
INTERFACE api
|
||||||
PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom geometry
|
PRIVATE os util common transport monitor nodes parser command planner catalog scheduler function qcom geometry
|
||||||
)
|
)
|
||||||
|
|
||||||
if(TD_DARWIN_ARM64)
|
if(TD_DARWIN_ARM64)
|
||||||
|
@ -61,7 +61,7 @@ target_include_directories(
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
taos_static
|
taos_static
|
||||||
INTERFACE api
|
INTERFACE api
|
||||||
PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom geometry
|
PRIVATE os util common transport monitor nodes parser command planner catalog scheduler function qcom geometry
|
||||||
)
|
)
|
||||||
|
|
||||||
if(${BUILD_TEST})
|
if(${BUILD_TEST})
|
||||||
|
|
|
@ -391,6 +391,8 @@ void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr);
|
||||||
void destroyAllRequests(SHashObj* pRequests);
|
void destroyAllRequests(SHashObj* pRequests);
|
||||||
void stopAllRequests(SHashObj* pRequests);
|
void stopAllRequests(SHashObj* pRequests);
|
||||||
|
|
||||||
|
SAppInstInfo* getAppInstInfo(const char* clusterKey);
|
||||||
|
|
||||||
// conn level
|
// conn level
|
||||||
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
|
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
|
||||||
void hbDeregisterConn(STscObj* pTscObj, SClientHbKey connKey);
|
void hbDeregisterConn(STscObj* pTscObj, SClientHbKey connKey);
|
||||||
|
@ -426,6 +428,13 @@ void freeQueryParam(SSyncQueryParam* param);
|
||||||
int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser, SParseSqlRes* pRes);
|
int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser, SParseSqlRes* pRes);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
void clusterSlowQueryMonitorInit(const char* clusterKey);
|
||||||
|
void clusterSlowQueryLog(const char* clusterKey, int32_t cost);
|
||||||
|
void SlowQueryLog(int64_t connId, int32_t cost);
|
||||||
|
|
||||||
|
void clusterSelectMonitorInit(const char* clusterKey);
|
||||||
|
void clusterSelectLog(const char* clusterKey);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -114,6 +114,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
||||||
taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s",
|
taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s",
|
||||||
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
|
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
|
||||||
pRequest->sqlstr);
|
pRequest->sqlstr);
|
||||||
|
SlowQueryLog(pTscObj->connId, duration);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
#include "clientInt.h"
|
#include "clientInt.h"
|
||||||
|
#include "clientMonitor.h"
|
||||||
#include "clientLog.h"
|
#include "clientLog.h"
|
||||||
#include "command.h"
|
#include "command.h"
|
||||||
#include "scheduler.h"
|
#include "scheduler.h"
|
||||||
|
@ -157,6 +158,8 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
|
||||||
tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
|
tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
|
||||||
|
|
||||||
pInst = &p;
|
pInst = &p;
|
||||||
|
|
||||||
|
clusterSlowQueryMonitorInit(p->instKey);
|
||||||
} else {
|
} else {
|
||||||
ASSERTS((*pInst) && (*pInst)->pAppHbMgr, "*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
|
ASSERTS((*pInst) && (*pInst)->pAppHbMgr, "*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
|
||||||
// reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
|
// reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
|
||||||
|
@ -166,9 +169,19 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
|
||||||
taosThreadMutexUnlock(&appInfo.mutex);
|
taosThreadMutexUnlock(&appInfo.mutex);
|
||||||
|
|
||||||
taosMemoryFreeClear(key);
|
taosMemoryFreeClear(key);
|
||||||
|
|
||||||
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
|
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SAppInstInfo* getAppInstInfo(const char* clusterKey) {
|
||||||
|
SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
|
||||||
|
if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
|
||||||
|
return *ppAppInstInfo;
|
||||||
|
} else {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void freeQueryParam(SSyncQueryParam* param) {
|
void freeQueryParam(SSyncQueryParam* param) {
|
||||||
if (param == NULL) return;
|
if (param == NULL) return;
|
||||||
tsem_destroy(¶m->sem);
|
tsem_destroy(¶m->sem);
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#include "clientInt.h"
|
||||||
|
#include "clientMonitor.h"
|
||||||
|
#include "clientLog.h"
|
||||||
|
|
||||||
|
const char* selectMonitorName = "slow_query";
|
||||||
|
const char* selectMonitorHelp = "slow query log when cost > 3s";
|
||||||
|
const int selectMonitorLabelCount = 1;
|
||||||
|
const char* selectMonitorLabels[] = {"default"};
|
||||||
|
|
||||||
|
void clusterSelectMonitorInit(const char* clusterKey) {
|
||||||
|
SAppInstInfo* pAppInstInfo = getAppInstInfo(clusterKey);
|
||||||
|
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
|
||||||
|
clusterMonitorInit(clusterKey, epSet, pAppInstInfo->pTransporter);
|
||||||
|
createClusterCounter(clusterKey, selectMonitorName, selectMonitorHelp, selectMonitorLabelCount, selectMonitorLabels);
|
||||||
|
}
|
||||||
|
|
||||||
|
void clusterSelectLog(const char* clusterKey) {
|
||||||
|
const char* selectMonitorLabelValues[] = {"default"};
|
||||||
|
taosClusterCounterInc(clusterKey, selectMonitorName, selectMonitorLabelValues);
|
||||||
|
}
|
||||||
|
|
||||||
|
void selectLog(int64_t connId) {
|
||||||
|
STscObj* pTscObj = acquireTscObj(connId);
|
||||||
|
if (pTscObj != NULL) {
|
||||||
|
if(pTscObj->pAppInfo == NULL) {
|
||||||
|
tscLog("selectLog, not found pAppInfo");
|
||||||
|
}
|
||||||
|
return clusterSelectLog(pTscObj->pAppInfo->instKey);
|
||||||
|
} else {
|
||||||
|
tscLog("selectLog, not found connect ID");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,73 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#include "clientInt.h"
|
||||||
|
#include "clientMonitor.h"
|
||||||
|
#include "clientLog.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
|
||||||
|
const char* slowQueryName = "slow_query";
|
||||||
|
const char* slowQueryHelp = "slow query log when cost > 3s";
|
||||||
|
const int slowQueryLabelCount = 1;
|
||||||
|
const char* slowQueryLabels[] = {"cost"};
|
||||||
|
|
||||||
|
const int64_t msInSeconds = 1000;
|
||||||
|
const int64_t msInMinutes = 60 * msInSeconds;
|
||||||
|
|
||||||
|
static const char* getSlowQueryLableCostDesc(int64_t cost) {
|
||||||
|
if (cost >= 30 * msInMinutes) {
|
||||||
|
return " > 30 min";
|
||||||
|
} else if (cost >= 10 * msInMinutes) {
|
||||||
|
return " > 10 min";
|
||||||
|
} else if (cost >= 5 * msInMinutes) {
|
||||||
|
return " > 5 min";
|
||||||
|
} else if (cost >= 1 * msInMinutes) {
|
||||||
|
return " > 1 min";
|
||||||
|
} else if (cost >= 30 * msInSeconds) {
|
||||||
|
return " > 30 seconds";
|
||||||
|
} else if (cost >= 10 * msInSeconds) {
|
||||||
|
return " > 10 seconds";
|
||||||
|
} else if (cost >= 5 * msInSeconds) {
|
||||||
|
return " > 5 seconds";
|
||||||
|
} else if (cost >= 3 * msInSeconds) {
|
||||||
|
return " > 3 seconds";
|
||||||
|
}
|
||||||
|
return "< 3 s";
|
||||||
|
}
|
||||||
|
|
||||||
|
void clusterSlowQueryMonitorInit(const char* clusterKey) {
|
||||||
|
if (!enableSlowQueryMonitor) return;
|
||||||
|
SAppInstInfo* pAppInstInfo = getAppInstInfo(clusterKey);
|
||||||
|
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
|
||||||
|
clusterMonitorInit(clusterKey, epSet, pAppInstInfo->pTransporter);
|
||||||
|
createClusterCounter(clusterKey, slowQueryName, slowQueryHelp, slowQueryLabelCount, slowQueryLabels);
|
||||||
|
}
|
||||||
|
|
||||||
|
void clusterSlowQueryLog(const char* clusterKey, int32_t cost) {
|
||||||
|
const char* slowQueryLabelValues[] = {getSlowQueryLableCostDesc(cost)};
|
||||||
|
taosClusterCounterInc(clusterKey, slowQueryName, slowQueryLabelValues);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SlowQueryLog(int64_t connId, int32_t cost) {
|
||||||
|
if (!enableSlowQueryMonitor) return;
|
||||||
|
STscObj* pTscObj = acquireTscObj(connId);
|
||||||
|
if (pTscObj != NULL) {
|
||||||
|
if(pTscObj->pAppInfo == NULL) {
|
||||||
|
tscLog("SlowQueryLog, not found pAppInfo");
|
||||||
|
}
|
||||||
|
return clusterSlowQueryLog(pTscObj->pAppInfo->instKey, cost);
|
||||||
|
} else {
|
||||||
|
tscLog("SlowQueryLog, not found connect ID");
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,6 +23,12 @@ TARGET_LINK_LIBRARIES(
|
||||||
PUBLIC os util common transport parser catalog scheduler function gtest taos_static qcom geometry
|
PUBLIC os util common transport parser catalog scheduler function gtest taos_static qcom geometry
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ADD_EXECUTABLE(clientMonitorTest clientMonitorTests.cpp)
|
||||||
|
TARGET_LINK_LIBRARIES(
|
||||||
|
clientMonitorTest
|
||||||
|
PUBLIC os util common transport monitor parser catalog scheduler function gtest taos_static qcom executor
|
||||||
|
)
|
||||||
|
|
||||||
TARGET_INCLUDE_DIRECTORIES(
|
TARGET_INCLUDE_DIRECTORIES(
|
||||||
clientTest
|
clientTest
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/client/"
|
PUBLIC "${TD_SOURCE_DIR}/include/client/"
|
||||||
|
@ -41,7 +47,18 @@ TARGET_INCLUDE_DIRECTORIES(
|
||||||
PRIVATE "${TD_SOURCE_DIR}/source/client/inc"
|
PRIVATE "${TD_SOURCE_DIR}/source/client/inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
TARGET_INCLUDE_DIRECTORIES(
|
||||||
|
clientMonitorTest
|
||||||
|
PUBLIC "${TD_SOURCE_DIR}/include/client/"
|
||||||
|
PRIVATE "${TD_SOURCE_DIR}/source/client/inc"
|
||||||
|
)
|
||||||
|
|
||||||
add_test(
|
add_test(
|
||||||
NAME smlTest
|
NAME smlTest
|
||||||
COMMAND smlTest
|
COMMAND smlTest
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# add_test(
|
||||||
|
# NAME clientMonitorTest
|
||||||
|
# COMMAND clientMonitorTest
|
||||||
|
# )
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <iostream>
|
||||||
|
#include "clientInt.h"
|
||||||
|
#include "clientMonitor.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "thash.h"
|
||||||
|
|
||||||
|
#pragma GCC diagnostic push
|
||||||
|
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||||
|
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||||
|
|
||||||
|
#include "executor.h"
|
||||||
|
#include "taos.h"
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(clientMonitorTest, monitorTest) {
|
||||||
|
const char* cluster1 = "cluster1";
|
||||||
|
const char* cluster2 = "cluster2";
|
||||||
|
SEpSet epSet;
|
||||||
|
clusterMonitorInit(cluster1, epSet, NULL);
|
||||||
|
const char* counterName1 = "slow_query";
|
||||||
|
const char* counterName2 = "select_count";
|
||||||
|
const char* help1 = "test for slowQuery";
|
||||||
|
const char* help2 = "test for selectSQL";
|
||||||
|
const char* lables[] = {"lable1"};
|
||||||
|
taos_counter_t* c1 = createClusterCounter(cluster1, counterName1, help1, 1, lables);
|
||||||
|
ASSERT_TRUE(c1 != NULL);
|
||||||
|
taos_counter_t* c2 = createClusterCounter(cluster1, counterName2, help2, 1, lables);
|
||||||
|
ASSERT_TRUE(c2 != NULL);
|
||||||
|
ASSERT_TRUE(c1 != c2);
|
||||||
|
taos_counter_t* c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables);
|
||||||
|
ASSERT_TRUE(c21 == NULL);
|
||||||
|
clusterMonitorInit(cluster2, epSet, NULL);
|
||||||
|
c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables);
|
||||||
|
ASSERT_TRUE(c21 != NULL);
|
||||||
|
int i = 0;
|
||||||
|
while (i < 12) {
|
||||||
|
taosMsleep(10);
|
||||||
|
++i;
|
||||||
|
} clusterMonitorClose(cluster1);
|
||||||
|
clusterMonitorClose(cluster2);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(clientMonitorTest, sendTest) {
|
||||||
|
TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
|
||||||
|
ASSERT_TRUE(taos != NULL);
|
||||||
|
printf("connect taosd sucessfully.\n");
|
||||||
|
|
||||||
|
int64_t connId = *(int64_t *)taos;
|
||||||
|
SlowQueryLog(connId, 1000);
|
||||||
|
int i = 0;
|
||||||
|
while (i < 20) {
|
||||||
|
SlowQueryLog(connId, i * 1000);
|
||||||
|
taosMsleep(10);
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_close(taos);
|
||||||
|
}
|
|
@ -156,6 +156,7 @@ int32_t tsMetaCacheMaxSize = -1; // MB
|
||||||
int32_t tsSlowLogThreshold = 3; // seconds
|
int32_t tsSlowLogThreshold = 3; // seconds
|
||||||
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;
|
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;
|
||||||
int32_t tsTimeSeriesThreshold = 50;
|
int32_t tsTimeSeriesThreshold = 50;
|
||||||
|
bool enableSlowQueryMonitor = false;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
||||||
|
@ -479,6 +480,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "slowLogScope", "", CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddString(pCfg, "slowLogScope", "", CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
|
if (cfgAddBool(pCfg, "enableSlowQueryMonitor", enableSlowQueryMonitor, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfRpcThreads = tsNumOfCores / 2;
|
tsNumOfRpcThreads = tsNumOfCores / 2;
|
||||||
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
|
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
|
||||||
|
@ -991,6 +993,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) {
|
if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
enableSlowQueryMonitor = cfgGetItem(pCfg, "enableSlowQueryMonitor")->bval;
|
||||||
|
|
||||||
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
|
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ target_include_directories(
|
||||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(monitor os util common transport monitorfw)
|
target_link_libraries(monitor os util common qcom transport monitorfw)
|
||||||
|
|
||||||
if(${BUILD_TEST})
|
if(${BUILD_TEST})
|
||||||
add_subdirectory(test)
|
add_subdirectory(test)
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#define _TD_MONITOR_INT_H_
|
#define _TD_MONITOR_INT_H_
|
||||||
|
|
||||||
#include "monitor.h"
|
#include "monitor.h"
|
||||||
|
#include "query.h"
|
||||||
|
|
||||||
#include "tjson.h"
|
#include "tjson.h"
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,189 @@
|
||||||
|
#include "clientMonitor.h"
|
||||||
|
#include "os.h"
|
||||||
|
#include "tmisce.h"
|
||||||
|
#include "ttime.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
|
||||||
|
SRWLatch monitorLock;
|
||||||
|
void* tmrClientMonitor;
|
||||||
|
tmr_h tmrStartHandle;
|
||||||
|
SHashObj* clusterMonitorInfoTable;
|
||||||
|
|
||||||
|
static const int interval = 1000; // ms
|
||||||
|
static const int sendSize = 10;
|
||||||
|
static const int sleepTimeMS = 100;
|
||||||
|
|
||||||
|
int32_t sendReport(ClientMonitor* pMonitor, char* pCont);
|
||||||
|
void generateClusterReport(ClientMonitor* pMonitor, bool send) {
|
||||||
|
char ts[50];
|
||||||
|
sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
|
||||||
|
char* pCont = (char*)taos_collector_registry_bridge(pMonitor->registry, ts, "%" PRId64);
|
||||||
|
if (send && strlen(pCont) != TSDB_CODE_SUCCESS) {
|
||||||
|
if (sendReport(pMonitor, pCont) == 0) {
|
||||||
|
taos_collector_registry_clear_out(pMonitor->registry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void reportSendProcess(void* param, void* tmrId) {
|
||||||
|
taosRLockLatch(&monitorLock);
|
||||||
|
taosTmrReset(reportSendProcess, interval, NULL, tmrClientMonitor, &tmrStartHandle);
|
||||||
|
|
||||||
|
static int index = 0;
|
||||||
|
index++;
|
||||||
|
ClientMonitor** ppMonitor = (ClientMonitor**)taosHashIterate(clusterMonitorInfoTable, NULL);
|
||||||
|
while (ppMonitor != NULL && *ppMonitor != NULL) {
|
||||||
|
ClientMonitor* pMonitor = *ppMonitor;
|
||||||
|
generateClusterReport(*ppMonitor, index == sendSize);
|
||||||
|
ppMonitor = taosHashIterate(clusterMonitorInfoTable, ppMonitor);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (index == sendSize) index = 0;
|
||||||
|
taosRUnLockLatch(&monitorLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
void monitorClientInitOnce() {
|
||||||
|
static int8_t init = 0;
|
||||||
|
if (atomic_exchange_8(&init, 1) == 0) {
|
||||||
|
uInfo("tscMonitorInit once.");
|
||||||
|
clusterMonitorInfoTable =
|
||||||
|
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
|
tmrClientMonitor = taosTmrInit(0, 0, 0, "MONITOR");
|
||||||
|
tmrStartHandle = taosTmrStart(reportSendProcess, interval, NULL, tmrClientMonitor);
|
||||||
|
taosInitRWLatch(&monitorLock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void createMonitorClient(const char* clusterKey, SEpSet epSet, void* pTransporter) {
|
||||||
|
taosWLockLatch(&monitorLock);
|
||||||
|
if (clusterKey == NULL || strlen(clusterKey) == 0) {
|
||||||
|
uError("createMonitorClient failed, clusterKey is NULL");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)) == NULL) {
|
||||||
|
uInfo("createMonitorClient for %s.", clusterKey);
|
||||||
|
ClientMonitor* pMonitor = taosMemoryCalloc(1, sizeof(ClientMonitor));
|
||||||
|
snprintf(pMonitor->clusterKey, sizeof(pMonitor->clusterKey), "%s", clusterKey);
|
||||||
|
pMonitor->registry = taos_collector_registry_new(clusterKey);
|
||||||
|
pMonitor->colector = taos_collector_new(clusterKey);
|
||||||
|
pMonitor->epSet = epSet;
|
||||||
|
pMonitor->pTransporter = pTransporter;
|
||||||
|
|
||||||
|
taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
|
||||||
|
pMonitor->counters =
|
||||||
|
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
|
taosHashPut(clusterMonitorInfoTable, clusterKey, strlen(clusterKey), &pMonitor, sizeof(ClientMonitor*));
|
||||||
|
uInfo("createMonitorClient for %s finished %p.", clusterKey, pMonitor);
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&monitorLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
|
static int32_t emptyRspNum = 0;
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
uError("found error in monitorReport send callback, code:%d, please check the network.", code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sendReport(ClientMonitor* pMonitor, char* pCont) {
|
||||||
|
SStatisReq sStatisReq;
|
||||||
|
sStatisReq.pCont = pCont;
|
||||||
|
sStatisReq.contLen = strlen(pCont);
|
||||||
|
|
||||||
|
int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq);
|
||||||
|
if (tlen < 0) return 0;
|
||||||
|
void* buf = taosMemoryMalloc(tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tSerializeSStatisReq(buf, tlen, &sStatisReq);
|
||||||
|
|
||||||
|
SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
|
if (pInfo == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pInfo->fp = monitorReportAsyncCB;
|
||||||
|
pInfo->msgInfo.pData = buf;
|
||||||
|
pInfo->msgInfo.len = tlen;
|
||||||
|
pInfo->msgType = TDMT_MND_STATIS;
|
||||||
|
// pInfo->param = taosMemoryMalloc(sizeof(int32_t));
|
||||||
|
// *(int32_t*)pInfo->param = i;
|
||||||
|
pInfo->paramFreeFp = taosMemoryFree;
|
||||||
|
pInfo->requestId = tGenIdPI64();
|
||||||
|
pInfo->requestObjRefId = 0;
|
||||||
|
|
||||||
|
int64_t transporterId = 0;
|
||||||
|
return asyncSendMsgToServer(pMonitor->pTransporter, &pMonitor->epSet, &transporterId, pInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
void clusterMonitorInit(const char* clusterKey, SEpSet epSet, void* pTransporter) {
|
||||||
|
monitorClientInitOnce();
|
||||||
|
createMonitorClient(clusterKey, epSet, pTransporter);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_counter_t* createClusterCounter(const char* clusterKey, const char* name, const char* help, size_t label_key_count,
|
||||||
|
const char** label_keys) {
|
||||||
|
ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey));
|
||||||
|
|
||||||
|
if (ppMonitor != NULL && *ppMonitor != NULL) {
|
||||||
|
ClientMonitor* pMonitor = *ppMonitor;
|
||||||
|
taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, name, strlen(name));
|
||||||
|
if (ppCounter != NULL && *ppCounter != NULL) {
|
||||||
|
taosHashRemove(pMonitor->counters, name, strlen(name));
|
||||||
|
uInfo("createClusterCounter remove old counter: %s.", name);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
|
||||||
|
if (newCounter != NULL) {
|
||||||
|
taos_collector_add_metric(pMonitor->colector, newCounter);
|
||||||
|
taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, sizeof(taos_counter_t*));
|
||||||
|
uInfo("createClusterCounter %s(%p):%s : %p.", pMonitor->clusterKey, pMonitor, name, newCounter);
|
||||||
|
return newCounter;
|
||||||
|
} else {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int taosClusterCounterInc(const char* clusterKey, const char* counterName, const char** label_values) {
|
||||||
|
taosRLockLatch(&monitorLock);
|
||||||
|
ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey));
|
||||||
|
|
||||||
|
if (ppMonitor != NULL && *ppMonitor != NULL) {
|
||||||
|
ClientMonitor* pMonitor = *ppMonitor;
|
||||||
|
taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
|
||||||
|
if (ppCounter != NULL && *ppCounter != NULL) {
|
||||||
|
int res = taos_counter_inc(*ppCounter, label_values);
|
||||||
|
} else {
|
||||||
|
uError("taosClusterCounterInc not found pCounter %s:%s.", clusterKey, counterName);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
uError("taosClusterCounterInc not found pMonitor %s.", clusterKey);
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(&monitorLock);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void clusterMonitorClose(const char* clusterKey) {
|
||||||
|
taosWLockLatch(&monitorLock);
|
||||||
|
ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey));
|
||||||
|
|
||||||
|
if (ppMonitor != NULL && *ppMonitor != NULL) {
|
||||||
|
ClientMonitor* pMonitor = *ppMonitor;
|
||||||
|
uInfo("clusterMonitorClose valule:%p clusterKey:%s.", pMonitor, pMonitor->clusterKey);
|
||||||
|
taosHashCleanup(pMonitor->counters);
|
||||||
|
taos_collector_registry_destroy(pMonitor->registry);
|
||||||
|
taosMemoryFree(pMonitor);
|
||||||
|
taosHashRemove(clusterMonitorInfoTable, clusterKey, strlen(clusterKey));
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&monitorLock);
|
||||||
|
}
|
Loading…
Reference in New Issue