diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 1d0da9e150..1765b0a234 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -147,6 +147,7 @@ extern int32_t tsMetaCacheMaxSize; extern int32_t tsSlowLogThreshold; extern int32_t tsSlowLogScope; extern int32_t tsTimeSeriesThreshold; +extern bool enableSlowQueryMonitor; // client extern int32_t tsMinSlidingTime; diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h new file mode 100644 index 0000000000..0d238157c5 --- /dev/null +++ b/include/libs/monitor/clientMonitor.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_CLIENT_MONITOR_H +#define TDENGINE_CLIENT_MONITOR_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "taos_monitor.h" +#include "thash.h" +#include "query.h" + +typedef struct { + char clusterKey[512]; + SEpSet epSet; + void* pTransporter; + taos_collector_registry_t* registry; + taos_collector_t* colector; + taos_counter_t* slow_query_counter; + taos_counter_t* select_counter; + SHashObj* counters; +} ClientMonitor; + +void clusterMonitorInit(const char* clusterKey, SEpSet epSet, void* pTransporter); +void clusterMonitorClose(const char* clusterKey); +taos_counter_t* createClusterCounter(const char* clusterKey, const char* name, const char* help, size_t label_key_count, + const char** label_keys); +int taosClusterCounterInc(const char* clusterKey, const char* counterName, const char** label_values); + +void cluster_monitor_stop(); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_CLIENT_MONITOR_H diff --git a/source/client/CMakeLists.txt b/source/client/CMakeLists.txt index f089247859..a17c27c297 100644 --- a/source/client/CMakeLists.txt +++ b/source/client/CMakeLists.txt @@ -20,7 +20,7 @@ target_include_directories( target_link_libraries( taos INTERFACE api - PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom geometry + PRIVATE os util common transport monitor nodes parser command planner catalog scheduler function qcom geometry ) if(TD_DARWIN_ARM64) @@ -61,7 +61,7 @@ target_include_directories( target_link_libraries( taos_static INTERFACE api - PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom geometry + PRIVATE os util common transport monitor nodes parser command planner catalog scheduler function qcom geometry ) if(${BUILD_TEST}) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 75003d76d8..ef19fb7c51 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -391,6 +391,8 @@ void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr); void destroyAllRequests(SHashObj* pRequests); void stopAllRequests(SHashObj* pRequests); +SAppInstInfo* getAppInstInfo(const char* clusterKey); + // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); void hbDeregisterConn(STscObj* pTscObj, SClientHbKey connKey); @@ -426,6 +428,13 @@ void freeQueryParam(SSyncQueryParam* param); int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser, SParseSqlRes* pRes); #endif +void clusterSlowQueryMonitorInit(const char* clusterKey); +void clusterSlowQueryLog(const char* clusterKey, int32_t cost); +void SlowQueryLog(int64_t connId, int32_t cost); + +void clusterSelectMonitorInit(const char* clusterKey); +void clusterSelectLog(const char* clusterKey); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index b36ef20b53..26b076e5ee 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -114,6 +114,7 @@ static void deregisterRequest(SRequestObj *pRequest) { taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s", taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr); + SlowQueryLog(pTscObj->connId, duration); } } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 395a396d89..e3aaa63c87 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -15,6 +15,7 @@ #include "cJSON.h" #include "clientInt.h" +#include "clientMonitor.h" #include "clientLog.h" #include "command.h" #include "scheduler.h" @@ -157,6 +158,8 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port); pInst = &p; + + clusterSlowQueryMonitorInit(p->instKey); } else { ASSERTS((*pInst) && (*pInst)->pAppHbMgr, "*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL); // reset to 0 in case of conn with duplicated user key but its user has ever been dropped. @@ -166,9 +169,19 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas taosThreadMutexUnlock(&appInfo.mutex); taosMemoryFreeClear(key); + return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType); } +SAppInstInfo* getAppInstInfo(const char* clusterKey) { + SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey)); + if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) { + return *ppAppInstInfo; + } else { + return NULL; + } +} + void freeQueryParam(SSyncQueryParam* param) { if (param == NULL) return; tsem_destroy(¶m->sem); diff --git a/source/client/src/selectMonitor.c b/source/client/src/selectMonitor.c new file mode 100644 index 0000000000..0cb2073961 --- /dev/null +++ b/source/client/src/selectMonitor.c @@ -0,0 +1,46 @@ + /* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "clientInt.h" +#include "clientMonitor.h" +#include "clientLog.h" + +const char* selectMonitorName = "slow_query"; +const char* selectMonitorHelp = "slow query log when cost > 3s"; +const int selectMonitorLabelCount = 1; +const char* selectMonitorLabels[] = {"default"}; + +void clusterSelectMonitorInit(const char* clusterKey) { + SAppInstInfo* pAppInstInfo = getAppInstInfo(clusterKey); + SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); + clusterMonitorInit(clusterKey, epSet, pAppInstInfo->pTransporter); + createClusterCounter(clusterKey, selectMonitorName, selectMonitorHelp, selectMonitorLabelCount, selectMonitorLabels); +} + +void clusterSelectLog(const char* clusterKey) { + const char* selectMonitorLabelValues[] = {"default"}; + taosClusterCounterInc(clusterKey, selectMonitorName, selectMonitorLabelValues); +} + +void selectLog(int64_t connId) { + STscObj* pTscObj = acquireTscObj(connId); + if (pTscObj != NULL) { + if(pTscObj->pAppInfo == NULL) { + tscLog("selectLog, not found pAppInfo"); + } + return clusterSelectLog(pTscObj->pAppInfo->instKey); + } else { + tscLog("selectLog, not found connect ID"); + } +} diff --git a/source/client/src/slowQueryMonitor.c b/source/client/src/slowQueryMonitor.c new file mode 100644 index 0000000000..b42f860d67 --- /dev/null +++ b/source/client/src/slowQueryMonitor.c @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "clientInt.h" +#include "clientMonitor.h" +#include "clientLog.h" +#include "tglobal.h" + +const char* slowQueryName = "slow_query"; +const char* slowQueryHelp = "slow query log when cost > 3s"; +const int slowQueryLabelCount = 1; +const char* slowQueryLabels[] = {"cost"}; + +const int64_t msInSeconds = 1000; +const int64_t msInMinutes = 60 * msInSeconds; + +static const char* getSlowQueryLableCostDesc(int64_t cost) { + if (cost >= 30 * msInMinutes) { + return " > 30 min"; + } else if (cost >= 10 * msInMinutes) { + return " > 10 min"; + } else if (cost >= 5 * msInMinutes) { + return " > 5 min"; + } else if (cost >= 1 * msInMinutes) { + return " > 1 min"; + } else if (cost >= 30 * msInSeconds) { + return " > 30 seconds"; + } else if (cost >= 10 * msInSeconds) { + return " > 10 seconds"; + } else if (cost >= 5 * msInSeconds) { + return " > 5 seconds"; + } else if (cost >= 3 * msInSeconds) { + return " > 3 seconds"; + } + return "< 3 s"; +} + +void clusterSlowQueryMonitorInit(const char* clusterKey) { + if (!enableSlowQueryMonitor) return; + SAppInstInfo* pAppInstInfo = getAppInstInfo(clusterKey); + SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); + clusterMonitorInit(clusterKey, epSet, pAppInstInfo->pTransporter); + createClusterCounter(clusterKey, slowQueryName, slowQueryHelp, slowQueryLabelCount, slowQueryLabels); +} + +void clusterSlowQueryLog(const char* clusterKey, int32_t cost) { + const char* slowQueryLabelValues[] = {getSlowQueryLableCostDesc(cost)}; + taosClusterCounterInc(clusterKey, slowQueryName, slowQueryLabelValues); +} + +void SlowQueryLog(int64_t connId, int32_t cost) { + if (!enableSlowQueryMonitor) return; + STscObj* pTscObj = acquireTscObj(connId); + if (pTscObj != NULL) { + if(pTscObj->pAppInfo == NULL) { + tscLog("SlowQueryLog, not found pAppInfo"); + } + return clusterSlowQueryLog(pTscObj->pAppInfo->instKey, cost); + } else { + tscLog("SlowQueryLog, not found connect ID"); + } +} diff --git a/source/client/test/CMakeLists.txt b/source/client/test/CMakeLists.txt index 91f0d1eef8..7c3847e4a1 100644 --- a/source/client/test/CMakeLists.txt +++ b/source/client/test/CMakeLists.txt @@ -23,6 +23,12 @@ TARGET_LINK_LIBRARIES( PUBLIC os util common transport parser catalog scheduler function gtest taos_static qcom geometry ) +ADD_EXECUTABLE(clientMonitorTest clientMonitorTests.cpp) +TARGET_LINK_LIBRARIES( + clientMonitorTest + PUBLIC os util common transport monitor parser catalog scheduler function gtest taos_static qcom executor +) + TARGET_INCLUDE_DIRECTORIES( clientTest PUBLIC "${TD_SOURCE_DIR}/include/client/" @@ -41,7 +47,18 @@ TARGET_INCLUDE_DIRECTORIES( PRIVATE "${TD_SOURCE_DIR}/source/client/inc" ) +TARGET_INCLUDE_DIRECTORIES( + clientMonitorTest + PUBLIC "${TD_SOURCE_DIR}/include/client/" + PRIVATE "${TD_SOURCE_DIR}/source/client/inc" +) + add_test( NAME smlTest COMMAND smlTest ) + +# add_test( +# NAME clientMonitorTest +# COMMAND clientMonitorTest +# ) diff --git a/source/client/test/clientMonitorTests.cpp b/source/client/test/clientMonitorTests.cpp new file mode 100644 index 0000000000..8882e268e7 --- /dev/null +++ b/source/client/test/clientMonitorTests.cpp @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include "clientInt.h" +#include "clientMonitor.h" +#include "taoserror.h" +#include "tglobal.h" +#include "thash.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +#include "executor.h" +#include "taos.h" + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +TEST(clientMonitorTest, monitorTest) { + const char* cluster1 = "cluster1"; + const char* cluster2 = "cluster2"; + SEpSet epSet; + clusterMonitorInit(cluster1, epSet, NULL); + const char* counterName1 = "slow_query"; + const char* counterName2 = "select_count"; + const char* help1 = "test for slowQuery"; + const char* help2 = "test for selectSQL"; + const char* lables[] = {"lable1"}; + taos_counter_t* c1 = createClusterCounter(cluster1, counterName1, help1, 1, lables); + ASSERT_TRUE(c1 != NULL); + taos_counter_t* c2 = createClusterCounter(cluster1, counterName2, help2, 1, lables); + ASSERT_TRUE(c2 != NULL); + ASSERT_TRUE(c1 != c2); + taos_counter_t* c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables); + ASSERT_TRUE(c21 == NULL); + clusterMonitorInit(cluster2, epSet, NULL); + c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables); + ASSERT_TRUE(c21 != NULL); + int i = 0; + while (i < 12) { + taosMsleep(10); + ++i; + } clusterMonitorClose(cluster1); + clusterMonitorClose(cluster2); +} + +TEST(clientMonitorTest, sendTest) { + TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); + ASSERT_TRUE(taos != NULL); + printf("connect taosd sucessfully.\n"); + + int64_t connId = *(int64_t *)taos; + SlowQueryLog(connId, 1000); + int i = 0; + while (i < 20) { + SlowQueryLog(connId, i * 1000); + taosMsleep(10); + ++i; + } + + taos_close(taos); +} diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8e824b4d67..d755ef0075 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -156,6 +156,7 @@ int32_t tsMetaCacheMaxSize = -1; // MB int32_t tsSlowLogThreshold = 3; // seconds int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL; int32_t tsTimeSeriesThreshold = 50; +bool enableSlowQueryMonitor = false; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -479,6 +480,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddString(pCfg, "slowLogScope", "", CFG_SCOPE_CLIENT) != 0) return -1; + if (cfgAddBool(pCfg, "enableSlowQueryMonitor", enableSlowQueryMonitor, CFG_SCOPE_CLIENT) != 0) return -1; tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); @@ -991,6 +993,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) { return -1; } + enableSlowQueryMonitor = cfgGetItem(pCfg, "enableSlowQueryMonitor")->bval; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; diff --git a/source/libs/monitor/CMakeLists.txt b/source/libs/monitor/CMakeLists.txt index 13523fd3cc..cc8f40fa4c 100644 --- a/source/libs/monitor/CMakeLists.txt +++ b/source/libs/monitor/CMakeLists.txt @@ -6,7 +6,7 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) -target_link_libraries(monitor os util common transport monitorfw) +target_link_libraries(monitor os util common qcom transport monitorfw) if(${BUILD_TEST}) add_subdirectory(test) diff --git a/source/libs/monitor/inc/monInt.h b/source/libs/monitor/inc/monInt.h index ae1af4ba62..c5219c60b2 100644 --- a/source/libs/monitor/inc/monInt.h +++ b/source/libs/monitor/inc/monInt.h @@ -17,6 +17,7 @@ #define _TD_MONITOR_INT_H_ #include "monitor.h" +#include "query.h" #include "tjson.h" diff --git a/source/libs/monitor/src/clientMonitor.c b/source/libs/monitor/src/clientMonitor.c new file mode 100644 index 0000000000..941155f2fe --- /dev/null +++ b/source/libs/monitor/src/clientMonitor.c @@ -0,0 +1,189 @@ +#include "clientMonitor.h" +#include "os.h" +#include "tmisce.h" +#include "ttime.h" +#include "ttimer.h" + +SRWLatch monitorLock; +void* tmrClientMonitor; +tmr_h tmrStartHandle; +SHashObj* clusterMonitorInfoTable; + +static const int interval = 1000; // ms +static const int sendSize = 10; +static const int sleepTimeMS = 100; + +int32_t sendReport(ClientMonitor* pMonitor, char* pCont); +void generateClusterReport(ClientMonitor* pMonitor, bool send) { + char ts[50]; + sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); + char* pCont = (char*)taos_collector_registry_bridge(pMonitor->registry, ts, "%" PRId64); + if (send && strlen(pCont) != TSDB_CODE_SUCCESS) { + if (sendReport(pMonitor, pCont) == 0) { + taos_collector_registry_clear_out(pMonitor->registry); + } + } +} + +void reportSendProcess(void* param, void* tmrId) { + taosRLockLatch(&monitorLock); + taosTmrReset(reportSendProcess, interval, NULL, tmrClientMonitor, &tmrStartHandle); + + static int index = 0; + index++; + ClientMonitor** ppMonitor = (ClientMonitor**)taosHashIterate(clusterMonitorInfoTable, NULL); + while (ppMonitor != NULL && *ppMonitor != NULL) { + ClientMonitor* pMonitor = *ppMonitor; + generateClusterReport(*ppMonitor, index == sendSize); + ppMonitor = taosHashIterate(clusterMonitorInfoTable, ppMonitor); + } + + if (index == sendSize) index = 0; + taosRUnLockLatch(&monitorLock); +} + +void monitorClientInitOnce() { + static int8_t init = 0; + if (atomic_exchange_8(&init, 1) == 0) { + uInfo("tscMonitorInit once."); + clusterMonitorInfoTable = + (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + + tmrClientMonitor = taosTmrInit(0, 0, 0, "MONITOR"); + tmrStartHandle = taosTmrStart(reportSendProcess, interval, NULL, tmrClientMonitor); + taosInitRWLatch(&monitorLock); + } +} + +void createMonitorClient(const char* clusterKey, SEpSet epSet, void* pTransporter) { + taosWLockLatch(&monitorLock); + if (clusterKey == NULL || strlen(clusterKey) == 0) { + uError("createMonitorClient failed, clusterKey is NULL"); + return; + } + + if (taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)) == NULL) { + uInfo("createMonitorClient for %s.", clusterKey); + ClientMonitor* pMonitor = taosMemoryCalloc(1, sizeof(ClientMonitor)); + snprintf(pMonitor->clusterKey, sizeof(pMonitor->clusterKey), "%s", clusterKey); + pMonitor->registry = taos_collector_registry_new(clusterKey); + pMonitor->colector = taos_collector_new(clusterKey); + pMonitor->epSet = epSet; + pMonitor->pTransporter = pTransporter; + + taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector); + pMonitor->counters = + (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + + taosHashPut(clusterMonitorInfoTable, clusterKey, strlen(clusterKey), &pMonitor, sizeof(ClientMonitor*)); + uInfo("createMonitorClient for %s finished %p.", clusterKey, pMonitor); + } + taosWUnLockLatch(&monitorLock); +} + +static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { + static int32_t emptyRspNum = 0; + if (TSDB_CODE_SUCCESS != code) { + uError("found error in monitorReport send callback, code:%d, please check the network.", code); + } + return code; +} + +int32_t sendReport(ClientMonitor* pMonitor, char* pCont) { + SStatisReq sStatisReq; + sStatisReq.pCont = pCont; + sStatisReq.contLen = strlen(pCont); + + int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq); + if (tlen < 0) return 0; + void* buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + tSerializeSStatisReq(buf, tlen, &sStatisReq); + + SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (pInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pInfo->fp = monitorReportAsyncCB; + pInfo->msgInfo.pData = buf; + pInfo->msgInfo.len = tlen; + pInfo->msgType = TDMT_MND_STATIS; + // pInfo->param = taosMemoryMalloc(sizeof(int32_t)); + // *(int32_t*)pInfo->param = i; + pInfo->paramFreeFp = taosMemoryFree; + pInfo->requestId = tGenIdPI64(); + pInfo->requestObjRefId = 0; + + int64_t transporterId = 0; + return asyncSendMsgToServer(pMonitor->pTransporter, &pMonitor->epSet, &transporterId, pInfo); +} + +void clusterMonitorInit(const char* clusterKey, SEpSet epSet, void* pTransporter) { + monitorClientInitOnce(); + createMonitorClient(clusterKey, epSet, pTransporter); +} + +taos_counter_t* createClusterCounter(const char* clusterKey, const char* name, const char* help, size_t label_key_count, + const char** label_keys) { + ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); + + if (ppMonitor != NULL && *ppMonitor != NULL) { + ClientMonitor* pMonitor = *ppMonitor; + taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, name, strlen(name)); + if (ppCounter != NULL && *ppCounter != NULL) { + taosHashRemove(pMonitor->counters, name, strlen(name)); + uInfo("createClusterCounter remove old counter: %s.", name); + } + + taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); + if (newCounter != NULL) { + taos_collector_add_metric(pMonitor->colector, newCounter); + taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, sizeof(taos_counter_t*)); + uInfo("createClusterCounter %s(%p):%s : %p.", pMonitor->clusterKey, pMonitor, name, newCounter); + return newCounter; + } else { + return NULL; + } + } else { + return NULL; + } + return NULL; +} + +int taosClusterCounterInc(const char* clusterKey, const char* counterName, const char** label_values) { + taosRLockLatch(&monitorLock); + ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); + + if (ppMonitor != NULL && *ppMonitor != NULL) { + ClientMonitor* pMonitor = *ppMonitor; + taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName)); + if (ppCounter != NULL && *ppCounter != NULL) { + int res = taos_counter_inc(*ppCounter, label_values); + } else { + uError("taosClusterCounterInc not found pCounter %s:%s.", clusterKey, counterName); + } + } else { + uError("taosClusterCounterInc not found pMonitor %s.", clusterKey); + } + taosRUnLockLatch(&monitorLock); + return 0; +} + +void clusterMonitorClose(const char* clusterKey) { + taosWLockLatch(&monitorLock); + ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); + + if (ppMonitor != NULL && *ppMonitor != NULL) { + ClientMonitor* pMonitor = *ppMonitor; + uInfo("clusterMonitorClose valule:%p clusterKey:%s.", pMonitor, pMonitor->clusterKey); + taosHashCleanup(pMonitor->counters); + taos_collector_registry_destroy(pMonitor->registry); + taosMemoryFree(pMonitor); + taosHashRemove(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); + } + taosWUnLockLatch(&monitorLock); +}