Merge pull request #26263 from taosdata/fix/TS-4921

feat:[TS-4921] add slow log logic
This commit is contained in:
dapan1121 2024-06-25 17:07:35 +08:00 committed by GitHub
commit ccda3a7f0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
43 changed files with 1592 additions and 596 deletions

View File

@ -24,10 +24,11 @@
extern "C" {
#endif
#define SLOW_LOG_TYPE_NULL 0x0
#define SLOW_LOG_TYPE_QUERY 0x1
#define SLOW_LOG_TYPE_INSERT 0x2
#define SLOW_LOG_TYPE_OTHERS 0x4
#define SLOW_LOG_TYPE_ALL 0xFFFFFFFF
#define SLOW_LOG_TYPE_ALL 0x7
typedef enum {
DND_CA_SM4 = 1,
@ -177,7 +178,10 @@ extern int32_t tsMaxRetryWaitTime;
extern bool tsUseAdapter;
extern int32_t tsMetaCacheMaxSize;
extern int32_t tsSlowLogThreshold;
extern int32_t tsSlowLogThresholdTest;
extern char tsSlowLogExceptDb[];
extern int32_t tsSlowLogScope;
extern int32_t tsSlowLogMaxLen;
extern int32_t tsTimeSeriesThreshold;
extern bool tsMultiResultFunctionStarReturnTags;

View File

@ -654,6 +654,16 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define SSCHMEA_BYTES(s) ((s)->bytes)
#define SSCHMEA_NAME(s) ((s)->name)
typedef struct {
bool tsEnableMonitor;
int32_t tsMonitorInterval;
int32_t tsSlowLogThreshold;
int32_t tsSlowLogMaxLen;
int32_t tsSlowLogScope;
int32_t tsSlowLogThresholdTest;
char tsSlowLogExceptDb[TSDB_DB_NAME_LEN];
} SMonitorParas;
typedef struct {
int32_t nCols;
int32_t version;
@ -968,6 +978,7 @@ typedef struct {
char sVer[TSDB_VERSION_LEN];
char sDetailVer[128];
int64_t whiteListVer;
SMonitorParas monitorParas;
} SConnectRsp;
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
@ -1635,6 +1646,7 @@ typedef struct {
int8_t enableWhiteList;
int8_t encryptionKeyStat;
uint32_t encryptionKeyChksum;
SMonitorParas monitorParas;
} SClusterCfg;
typedef struct {
@ -1726,9 +1738,15 @@ int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
void tFreeSStatusReq(SStatusReq* pReq);
typedef enum {
MONITOR_TYPE_COUNTER = 0,
MONITOR_TYPE_SLOW_LOG = 1,
} MONITOR_TYPE;
typedef struct {
int32_t contLen;
char* pCont;
int32_t contLen;
char* pCont;
MONITOR_TYPE type;
} SStatisReq;
int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq);
@ -3260,6 +3278,7 @@ typedef struct {
int64_t rspId;
int32_t svrTimestamp;
SArray* rsps; // SArray<SClientHbRsp>
SMonitorParas monitorParas;
} SClientHbBatchRsp;
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); }

View File

@ -206,7 +206,7 @@ int32_t catalogInit(SCatalogCfg* cfg);
* @param catalogHandle (output, NO need to free it)
* @return error code
*/
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle);
int32_t catalogGetHandle(int64_t clusterId, SCatalog** catalogHandle);
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum, int64_t* stateTs);

View File

@ -23,6 +23,7 @@ extern "C" {
#include "taos_monitor.h"
#include "thash.h"
#include "query.h"
#include "tqueue.h"
typedef enum SQL_RESULT_CODE {
SQL_RESULT_SUCCESS = 0,
@ -30,23 +31,41 @@ typedef enum SQL_RESULT_CODE {
SQL_RESULT_CANCEL = 2,
} SQL_RESULT_CODE;
const char* resultStr(SQL_RESULT_CODE code);
#define SLOW_LOG_SEND_SIZE 1024*1024
extern tsem2_t monitorSem;
extern STaosQueue* monitorQueue;
typedef struct {
char clusterKey[512];
SEpSet epSet;
void* pTransporter;
int64_t clusterId;
taos_collector_registry_t* registry;
taos_collector_t* colector;
SHashObj* counters;
} ClientMonitor;
void* timer;
} MonitorClient;
void clusterMonitorInit(const char* clusterKey, SEpSet epSet, void* pTransporter);
void clusterMonitorClose(const char* clusterKey);
taos_counter_t* createClusterCounter(const char* clusterKey, const char* name, const char* help, size_t label_key_count,
const char** label_keys);
int taosClusterCounterInc(const char* clusterKey, const char* counterName, const char** label_values);
void cluster_monitor_stop();
typedef struct {
TdFilePtr pFile;
void* timer;
} SlowLogClient;
typedef struct {
int64_t clusterId;
char *value;
} MonitorSlowLogData;
void monitorClose();
void monitorInit();
void monitorSendAllSlowLogFromTempDir(void* pInst);
void monitorClientSQLReqInit(int64_t clusterKey);
void monitorClientSlowQueryInit(int64_t clusterId);
void monitorCreateClient(int64_t clusterId);
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys);
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values);
void* monitorThreadFunc(void *param);
void monitorFreeSlowLogData(MonitorSlowLogData* pData);
const char* monitorResultStr(SQL_RESULT_CODE code);
void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet);
#ifdef __cplusplus
}

View File

@ -227,7 +227,7 @@ void monSetSmInfo(SMonSmInfo *pInfo);
void monSetBmInfo(SMonBmInfo *pInfo);
void monGenAndSendReport();
void monGenAndSendReportBasic();
void monSendContent(char *pCont);
void monSendContent(char *pCont, const char* uri);
void tFreeSMonMmInfo(SMonMmInfo *pInfo);
void tFreeSMonVmInfo(SMonVmInfo *pInfo);

View File

@ -72,6 +72,9 @@ TdFilePtr taosCreateFile(const char *path, int32_t tdFileOptions);
#define TD_FILE_ACCESS_EXIST_OK 0x1
#define TD_FILE_ACCESS_READ_OK 0x2
#define TD_FILE_ACCESS_WRITE_OK 0x4
#define TD_TMP_FILE_PREFIX "tdengine-"
bool taosCheckAccessFile(const char *pathname, int mode);
int32_t taosLockFile(TdFilePtr pFile);

View File

@ -456,6 +456,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_DNODE_INVALID_LOCALE TAOS_DEF_ERROR_CODE(0, 0x0426)
#define TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR TAOS_DEF_ERROR_CODE(0, 0x0427)
#define TSDB_CODE_DNODE_INVALID_EN_WHITELIST TAOS_DEF_ERROR_CODE(0, 0x0428)
#define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)

View File

@ -573,7 +573,7 @@ enum {
#define TSDB_CONFIG_OPTION_LEN 32
#define TSDB_CONFIG_VALUE_LEN 64
#define TSDB_CONFIG_SCOPE_LEN 8
#define TSDB_CONFIG_NUMBER 8
#define TSDB_CONFIG_NUMBER 16
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18

View File

@ -115,10 +115,11 @@ struct SAppInstInfo {
SArray* pQnodeList;
SAppClusterSummary summary;
SList* pConnList; // STscObj linked list
uint64_t clusterId;
int64_t clusterId;
void* pTransporter;
SAppHbMgr* pAppHbMgr;
char* instKey;
SMonitorParas monitorParas;
};
typedef struct SAppInfo {
@ -127,6 +128,7 @@ typedef struct SAppInfo {
int32_t pid;
int32_t numOfThreads;
SHashObj* pInstMap;
SHashObj* pInstMapByClusterId;
TdThreadMutex mutex;
} SAppInfo;
@ -350,7 +352,7 @@ void* createTscObj(const char* user, const char* auth, const char* db, int32_
void destroyTscObj(void* pObj);
STscObj* acquireTscObj(int64_t rid);
int32_t releaseTscObj(int64_t rid);
void destroyAppInst(SAppInstInfo* pAppInfo);
void destroyAppInst(void* pAppInfo);
uint64_t generateRequestId();
@ -403,7 +405,7 @@ void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr);
void destroyAllRequests(SHashObj* pRequests);
void stopAllRequests(SHashObj* pRequests);
SAppInstInfo* getAppInstInfo(const char* clusterKey);
//SAppInstInfo* getAppInstInfo(const char* clusterKey);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
@ -441,10 +443,8 @@ void freeQueryParam(SSyncQueryParam* param);
int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser, SParseSqlRes* pRes);
#endif
void clientSlowQueryMonitorInit(const char* clusterKey);
void SlowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost);
void clientSQLReqMonitorInit(const char* clusterKey);
void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost);
enum {
MONITORSQLTYPESELECT = 0,
@ -454,8 +454,6 @@ enum {
void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type);
void clientMonitorClose(const char* clusterKey);
#ifdef __cplusplus
}
#endif

View File

@ -13,9 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <ttimer.h>
#include "cJSON.h"
#include "catalog.h"
#include "clientInt.h"
#include "clientLog.h"
#include "clientMonitor.h"
#include "functionMgt.h"
#include "os.h"
#include "osSleep.h"
@ -26,6 +29,7 @@
#include "tglobal.h"
#include "thttp.h"
#include "tmsg.h"
#include "tqueue.h"
#include "tref.h"
#include "trpc.h"
#include "tsched.h"
@ -70,6 +74,125 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
return TSDB_CODE_SUCCESS;
}
static void concatStrings(SArray *list, char* buf, int size){
int len = 0;
for(int i = 0; i < taosArrayGetSize(list); i++){
char* db = taosArrayGet(list, i);
char* dot = strchr(db, '.');
if (dot != NULL) {
db = dot + 1;
}
if (i != 0){
strcat(buf, ",");
len += 1;
}
int ret = snprintf(buf + len, size - len, "%s", db);
if (ret < 0) {
tscError("snprintf failed, buf:%s, ret:%d", buf, ret);
break;
}
len += ret;
if (len >= size){
tscInfo("dbList is truncated, buf:%s, len:%d", buf, len);
break;
}
}
}
static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration){
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
tscError("[monitor] cJSON_CreateObject failed");
return;
}
char clusterId[32] = {0};
if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0){
uError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
}
char startTs[32] = {0};
if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start/1000) < 0){
uError("failed to generate startTs:%" PRId64, pRequest->metric.start/1000);
}
char requestId[32] = {0};
if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0){
uError("failed to generate requestId:%" PRIu64, pRequest->requestId);
}
cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId));
cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs));
cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId));
cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration/1000));
cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code));
cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code)));
cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType));
cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.totalRows));
if(strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen];
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0';
cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr));
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = tmp;
}else{
cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr));
}
cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user));
cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName));
cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn));
char pid[32] = {0};
if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0){
uError("failed to generate pid:%d", appInfo.pid);
}
cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid));
if(pRequest->dbList != NULL){
char dbList[1024] = {0};
concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1);
cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList));
}else if(pRequest->pDb != NULL){
cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb));
}else{
cJSON_AddItemToObject(json, "db", cJSON_CreateString("unknown"));
}
MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0);
if (slowLogData == NULL) {
cJSON_Delete(json);
tscError("[monitor] failed to allocate slow log data");
return;
}
slowLogData->clusterId = pTscObj->pAppInfo->clusterId;
slowLogData->value = cJSON_PrintUnformatted(json);
tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value);
if (taosWriteQitem(monitorQueue, slowLogData) == 0){
tsem2_post(&monitorSem);
}else{
monitorFreeSlowLogData(slowLogData);
taosFreeQitem(slowLogData);
}
cJSON_Delete(json);
}
static bool checkSlowLogExceptDb(SRequestObj *pRequest, char* exceptDb) {
if (pRequest->pDb != NULL) {
return strcmp(pRequest->pDb, exceptDb) != 0;
}
for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) {
char *db = taosArrayGet(pRequest->dbList, i);
char *dot = strchr(db, '.');
if (dot != NULL) {
db = dot + 1;
}
if(strcmp(db, exceptDb) == 0){
return false;
}
}
return true;
}
static void deregisterRequest(SRequestObj *pRequest) {
if (pRequest == NULL) {
tscError("pRequest == NULL");
@ -113,21 +236,27 @@ static void deregisterRequest(SRequestObj *pRequest) {
nodesSimReleaseAllocator(pRequest->allocatorRefId);
}
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT);
} else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) {
sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE);
if(pTscObj->pAppInfo->monitorParas.tsEnableMonitor){
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT);
} else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) {
sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE);
}
}
if (duration >= (tsSlowLogThreshold * 1000000UL)) {
if ((duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThreshold * 1000000UL || duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThresholdTest * 1000000UL) &&
checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->monitorParas.tsSlowLogExceptDb)) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
if (tsSlowLogScope & reqType) {
taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s",
if (pTscObj->pAppInfo->monitorParas.tsSlowLogScope & reqType) {
taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 " us, Duration:%" PRId64 "us, SQL:%s",
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
pRequest->sqlstr);
SlowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
if(pTscObj->pAppInfo->monitorParas.tsEnableMonitor){
slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
generateWriteSlowLog(pTscObj, pRequest, reqType, duration);
}
}
}
@ -233,14 +362,13 @@ void stopAllRequests(SHashObj *pRequests) {
}
}
void destroyAppInst(SAppInstInfo *pAppInfo) {
void destroyAppInst(void *info) {
SAppInstInfo* pAppInfo = *(SAppInstInfo**)info;
tscDebug("destroy app inst mgr %p", pAppInfo);
taosThreadMutexLock(&appInfo.mutex);
clientMonitorClose(pAppInfo->instKey);
hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey));
taosThreadMutexUnlock(&appInfo.mutex);
@ -477,13 +605,11 @@ void doDestroyRequest(void *p) {
destorySqlCallbackWrapper(pRequest->pWrapper);
taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->pDb);
doFreeReqResultInfo(&pRequest->body.resInfo);
tsem_destroy(&pRequest->body.rspSem);
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList);
taosArrayDestroy(pRequest->targetTableList);
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
@ -492,6 +618,8 @@ void doDestroyRequest(void *p) {
deregisterRequest(pRequest);
}
taosMemoryFreeClear(pRequest->pDb);
taosArrayDestroy(pRequest->dbList);
if (pRequest->body.interParam) {
tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem);
}
@ -670,7 +798,7 @@ void tscStopCrashReport() {
}
if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
tscDebug("hb thread already stopped");
tscDebug("crash report thread already stopped");
return;
}
@ -719,7 +847,8 @@ void taos_init_imp(void) {
appInfo.pid = taosGetPId();
appInfo.startTime = taosGetTimestampMs();
appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
appInfo.pInstMapByClusterId = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst);
deltaToUtcInitOnce();
char logDirName[64] = {0};
@ -769,6 +898,7 @@ void taos_init_imp(void) {
taosThreadMutexInit(&appInfo.mutex, NULL);
tscCrashReportInit();
monitorInit();
tscDebug("client is initialized successfully");
}

View File

@ -18,6 +18,7 @@
#include "clientLog.h"
#include "scheduler.h"
#include "trpc.h"
#include "tglobal.h"
typedef struct {
union {
@ -67,7 +68,7 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
}
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) {
SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) {
@ -545,6 +546,9 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
}
SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
pInst->monitorParas = pRsp.monitorParas;
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
if (code != 0) {
pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
@ -1129,7 +1133,7 @@ int32_t hbGatherAppInfo(void) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) continue;
uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL == pApp) {
memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));

View File

@ -147,7 +147,7 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
}
p->pAppHbMgr = appHbMgrInit(p, key);
if (NULL == p->pAppHbMgr) {
destroyAppInst(p);
destroyAppInst(&p);
taosThreadMutexUnlock(&appInfo.mutex);
taosMemoryFreeClear(key);
return NULL;
@ -158,9 +158,6 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
pInst = &p;
clientSlowQueryMonitorInit(p->instKey);
clientSQLReqMonitorInit(p->instKey);
} else {
ASSERTS((*pInst) && (*pInst)->pAppHbMgr, "*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
// reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
@ -174,14 +171,14 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
}
SAppInstInfo* getAppInstInfo(const char* clusterKey) {
SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
return *ppAppInstInfo;
} else {
return NULL;
}
}
//SAppInstInfo* getAppInstInfo(const char* clusterKey) {
// SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
// if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
// return *ppAppInstInfo;
// } else {
// return NULL;
// }
//}
void freeQueryParam(SSyncQueryParam* param) {
if (param == NULL) return;

View File

@ -17,6 +17,7 @@
#include "clientInt.h"
#include "clientLog.h"
#include "clientStmt.h"
#include "clientMonitor.h"
#include "functionMgt.h"
#include "os.h"
#include "query.h"
@ -55,6 +56,9 @@ void taos_cleanup(void) {
return;
}
monitorClose();
taosHashCleanup(appInfo.pInstMap);
taosHashCleanup(appInfo.pInstMapByClusterId);
tscStopCrashReport();
hbMgrCleanUp();
@ -279,7 +283,6 @@ void taos_close_internal(void *taos) {
STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
// clientMonitorClose(pTscObj->pAppInfo->instKey);
taosRemoveRef(clientConnRefPool, pTscObj->id);
}

View File

@ -0,0 +1,652 @@
#include "clientMonitor.h"
#include "os.h"
#include "tmisce.h"
#include "ttime.h"
#include "ttimer.h"
#include "tglobal.h"
#include "tqueue.h"
#include "cJSON.h"
#include "clientInt.h"
SRWLatch monitorLock;
void* monitorTimer;
SHashObj* monitorCounterHash;
int32_t slowLogFlag = -1;
int32_t monitorFlag = -1;
tsem2_t monitorSem;
STaosQueue* monitorQueue;
SHashObj* monitorSlowLogHash;
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){
if (tsTempDir == NULL) {
return -1;
}
int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
if (ret < 0){
uError("failed to get tmp path ret:%d", ret);
return ret;
}
return 0;
}
//static void destroyCounter(void* data){
// if (data == NULL) {
// return;
// }
// taos_counter_t* conuter = *(taos_counter_t**)data;
// if(conuter == NULL){
// return;
// }
// taos_counter_destroy(conuter);
//}
static void destroySlowLogClient(void* data){
if (data == NULL) {
return;
}
SlowLogClient* slowLogClient = *(SlowLogClient**)data;
if(slowLogClient == NULL){
return;
}
taosTmrStopA(&(*(SlowLogClient**)data)->timer);
TdFilePtr pFile = slowLogClient->pFile;
if(pFile == NULL){
taosMemoryFree(slowLogClient);
return;
}
taosUnLockFile(pFile);
taosCloseFile(&pFile);
taosMemoryFree(slowLogClient);
}
static void destroyMonitorClient(void* data){
if (data == NULL) {
return;
}
MonitorClient* pMonitor = *(MonitorClient**)data;
if(pMonitor == NULL){
return;
}
taosTmrStopA(&pMonitor->timer);
taosHashCleanup(pMonitor->counters);
taos_collector_registry_destroy(pMonitor->registry);
// taos_collector_destroy(pMonitor->colector);
taosMemoryFree(pMonitor);
}
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
if(p == NULL){
uError("failed to get app inst, clusterId:%" PRIx64, clusterId);
return NULL;
}
return *(SAppInstInfo**)p;
}
static int32_t tscMonitortInit() {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
TdThread monitorThread;
if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
uError("failed to create monitor thread since %s", strerror(errno));
return -1;
}
taosThreadAttrDestroy(&thAttr);
return 0;
}
static void tscMonitorStop() {
if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) {
uDebug("monitor thread already stopped");
return;
}
while (atomic_load_32(&slowLogFlag) > 0) {
taosMsleep(100);
}
}
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
if (TSDB_CODE_SUCCESS != code) {
uError("found error in monitorReport send callback, code:%d, please check the network.", code);
}
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
return code;
}
static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITOR_TYPE type) {
SStatisReq sStatisReq;
sStatisReq.pCont = pCont;
sStatisReq.contLen = strlen(pCont);
sStatisReq.type = type;
int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq);
if (tlen < 0) return 0;
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
uError("sendReport failed, out of memory, len:%d", tlen);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSStatisReq(buf, tlen, &sStatisReq);
SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pInfo == NULL) {
uError("sendReport failed, out of memory send info");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pInfo->fp = monitorReportAsyncCB;
pInfo->msgInfo.pData = buf;
pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_STATIS;
// pInfo->param = taosMemoryMalloc(sizeof(int32_t));
// *(int32_t*)pInfo->param = i;
pInfo->paramFreeFp = taosMemoryFree;
pInfo->requestId = tGenIdPI64();
pInfo->requestObjRefId = 0;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo);
if (code != TSDB_CODE_SUCCESS) {
uError("sendReport failed, code:%d", code);
}
return code;
}
void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){
char buf[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log
char pCont[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log
int32_t offset = 0;
if(taosLSeekFile(pFile, 0, SEEK_SET) < 0){
uError("failed to seek file:%p code: %d", pFile, errno);
return;
}
while(1){
int64_t readSize = taosReadFile(pFile, buf + offset, SLOW_LOG_SEND_SIZE - offset);
if (readSize <= 0) {
uError("failed to read len from file:%p since %s", pFile, terrstr());
return;
}
memset(pCont, 0, sizeof(pCont));
strcat(pCont, "[");
char* string = buf;
for(int i = 0; i < readSize + offset; i++){
if (buf[i] == '\0') {
if (string != buf) strcat(pCont, ",");
strcat(pCont, string);
uDebug("[monitor] monitorReadSendSlowLog slow log:%s", string);
string = buf + i + 1;
}
}
strcat(pCont, "]");
if (pTransporter && pCont != NULL) {
if(sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_SLOW_LOG) != 0){
if(taosLSeekFile(pFile, -readSize, SEEK_CUR) < 0){
uError("failed to seek file:%p code: %d", pFile, errno);
}
uError("failed to send report:%s", pCont);
return;
}
uDebug("[monitor] monitorReadSendSlowLog send slow log to mnode:%s", pCont)
}
if (readSize + offset < SLOW_LOG_SEND_SIZE) {
break;
}
offset = SLOW_LOG_SEND_SIZE - (string - buf);
if(buf != string && offset != 0){
memmove(buf, string, offset);
uDebug("[monitor] monitorReadSendSlowLog left slow log:%s", buf)
}
}
if(taosFtruncateFile(pFile, 0) < 0){
uError("failed to truncate file:%p code: %d", pFile, errno);
}
uDebug("[monitor] monitorReadSendSlowLog send slow log file:%p", pFile);
}
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet *epSet) {
char ts[50] = {0};
sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
if(NULL == pCont) {
uError("generateClusterReport failed, get null content.");
return;
}
if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER) == 0) {
taos_collector_registry_clear_batch(registry);
}
taosMemoryFreeClear(pCont);
}
static void reportSendProcess(void* param, void* tmrId) {
taosRLockLatch(&monitorLock);
if (atomic_load_32(&monitorFlag) == 1) {
taosRUnLockLatch(&monitorLock);
return;
}
MonitorClient* pMonitor = (MonitorClient*)param;
SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
if(pInst == NULL){
taosRUnLockLatch(&monitorLock);
return;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
taosRUnLockLatch(&monitorLock);
taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
}
static void sendAllSlowLog(){
void* data = taosHashIterate(monitorSlowLogHash, NULL);
while (data != NULL) {
TdFilePtr pFile = (*(SlowLogClient**)data)->pFile;
if (pFile != NULL){
int64_t clusterId = *(int64_t*)taosHashGetKey(data, NULL);
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
if(pInst == NULL){
taosHashCancelIterate(monitorSlowLogHash, data);
break;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep);
}
data = taosHashIterate(monitorSlowLogHash, data);
}
uDebug("[monitor] sendAllSlowLog when client close");
}
void monitorSendAllSlowLogFromTempDir(void* inst){
SAppInstInfo* pInst = (SAppInstInfo*)inst;
if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){
uInfo("[monitor] monitor is disabled, skip send slow log");
return;
}
char namePrefix[PATH_MAX] = {0};
if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) {
uError("failed to generate slow log file name prefix");
return;
}
taosRLockLatch(&monitorLock);
char tmpPath[PATH_MAX] = {0};
if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
goto END;
}
TdDirPtr pDir = taosOpenDir(tmpPath);
if (pDir == NULL) {
goto END;
}
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
if (taosDirEntryIsDir(de)) {
continue;
}
char *name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 ||
strcmp(name, "..") == 0 ||
strstr(name, namePrefix) == NULL) {
uInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId);
continue;
}
char filename[PATH_MAX] = {0};
snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ);
if (pFile == NULL) {
uError("failed to open file:%s since %s", filename, terrstr());
continue;
}
if (taosLockFile(pFile) < 0) {
uError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
taosCloseFile(&pFile);
continue;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep);
taosUnLockFile(pFile);
taosCloseFile(&pFile);
taosRemoveFile(filename);
uDebug("[monitor] send and delete slow log file when reveive connect rsp:%s", filename);
}
taosCloseDir(&pDir);
END:
taosRUnLockLatch(&monitorLock);
}
static void sendAllCounter(){
MonitorClient** ppMonitor = (MonitorClient**)taosHashIterate(monitorCounterHash, NULL);
while (ppMonitor != NULL) {
MonitorClient* pMonitor = *ppMonitor;
if (pMonitor != NULL){
SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
if(pInst == NULL){
taosHashCancelIterate(monitorCounterHash, ppMonitor);
break;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
}
ppMonitor = taosHashIterate(monitorCounterHash, ppMonitor);
}
}
void monitorInit() {
uInfo("[monitor] tscMonitor init");
monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorCounterHash == NULL) {
uError("failed to create monitorCounterHash");
}
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorSlowLogHash == NULL) {
uError("failed to create monitorSlowLogHash");
}
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
if (monitorTimer == NULL) {
uError("failed to create monitor timer");
}
taosInitRWLatch(&monitorLock);
tscMonitortInit();
}
void monitorClose() {
uInfo("[monitor] tscMonitor close");
taosRLockLatch(&monitorLock);
if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) {
uDebug("[monitor] monitorFlag is not 0");
}
tscMonitorStop();
sendAllSlowLog();
sendAllCounter();
taosHashCleanup(monitorCounterHash);
taosHashCleanup(monitorSlowLogHash);
taosTmrCleanUp(monitorTimer);
taosRUnLockLatch(&monitorLock);
}
void monitorCreateClient(int64_t clusterId) {
MonitorClient* pMonitor = NULL;
taosWLockLatch(&monitorLock);
if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) {
uInfo("[monitor] monitorCreateClient for %" PRIx64, clusterId);
pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient));
if (pMonitor == NULL) {
uError("failed to create monitor client");
goto fail;
}
pMonitor->clusterId = clusterId;
char clusterKey[32] = {0};
if(snprintf(clusterKey, sizeof(clusterKey), "%"PRId64, clusterId) < 0){
uError("failed to create cluster key");
goto fail;
}
pMonitor->registry = taos_collector_registry_new(clusterKey);
if(pMonitor->registry == NULL){
uError("failed to create registry");
goto fail;
}
pMonitor->colector = taos_collector_new(clusterKey);
if(pMonitor->colector == NULL){
uError("failed to create collector");
goto fail;
}
taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
pMonitor->counters = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pMonitor->counters == NULL) {
uError("failed to create monitor counters");
goto fail;
}
// taosHashSetFreeFp(pMonitor->counters, destroyCounter);
if(taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0){
uError("failed to put monitor client to hash");
goto fail;
}
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
if(pInst == NULL){
uError("failed to get app instance by cluster id");
pMonitor = NULL;
goto fail;
}
pMonitor->timer = taosTmrStart(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer);
if(pMonitor->timer == NULL){
uError("failed to start timer");
goto fail;
}
uInfo("[monitor] monitorCreateClient for %"PRIx64 "finished %p.", clusterId, pMonitor);
}
taosWUnLockLatch(&monitorLock);
if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) {
uDebug("[monitor] monitorFlag already is 0");
}
return;
fail:
destroyMonitorClient(&pMonitor);
taosWUnLockLatch(&monitorLock);
}
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys) {
taosWLockLatch(&monitorLock);
MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
if (ppMonitor == NULL || *ppMonitor == NULL) {
uError("failed to get monitor client");
goto end;
}
taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
if (newCounter == NULL)
return;
MonitorClient* pMonitor = *ppMonitor;
taos_collector_add_metric(pMonitor->colector, newCounter);
if(taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0){
uError("failed to put counter to monitor");
taos_counter_destroy(newCounter);
goto end;
}
uInfo("[monitor] monitorCreateClientCounter %"PRIx64"(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter);
end:
taosWUnLockLatch(&monitorLock);
}
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) {
taosRLockLatch(&monitorLock);
MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
if (ppMonitor == NULL || *ppMonitor == NULL) {
uError("monitorCounterInc not found pMonitor %"PRId64, clusterId);
goto end;
}
MonitorClient* pMonitor = *ppMonitor;
taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
if (ppCounter == NULL || *ppCounter != NULL) {
uError("monitorCounterInc not found pCounter %"PRIx64":%s.", clusterId, counterName);
goto end;
}
taos_counter_inc(*ppCounter, label_values);
uInfo("[monitor] monitorCounterInc %"PRIx64"(%p):%s", pMonitor->clusterId, pMonitor, counterName);
end:
taosRUnLockLatch(&monitorLock);
}
const char* monitorResultStr(SQL_RESULT_CODE code) {
static const char* result_state[] = {"Success", "Failed", "Cancel"};
return result_state[code];
}
void monitorFreeSlowLogData(MonitorSlowLogData* pData) {
if (pData == NULL) {
return;
}
taosMemoryFree(pData->value);
}
void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); }
void reportSlowLog(void* param, void* tmrId) {
taosRLockLatch(&monitorLock);
if (atomic_load_32(&monitorFlag) == 1) {
taosRUnLockLatch(&monitorLock);
return;
}
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)param);
if(pInst == NULL){
uError("failed to get app inst, clusterId:%"PRIx64, (int64_t)param);
taosRUnLockLatch(&monitorLock);
return;
}
void* tmp = taosHashGet(monitorSlowLogHash, &param, LONG_BYTES);
if(tmp == NULL){
uError("failed to get file inst, clusterId:%"PRIx64, (int64_t)param);
taosRUnLockLatch(&monitorLock);
return;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
monitorReadSendSlowLog((*(SlowLogClient**)tmp)->pFile, pInst->pTransporter, &ep);
taosRUnLockLatch(&monitorLock);
taosTmrReset(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
}
void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){
taosRLockLatch(&monitorLock);
TdFilePtr pFile = NULL;
void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
if (tmp == NULL){
char path[PATH_MAX] = {0};
char clusterId[32] = {0};
if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0){
uError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId);
goto FAILED;
}
taosGetTmpfilePath(tmpPath, clusterId, path);
uInfo("[monitor] create slow log file:%s", path);
pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to open file:%s since %s", path, terrstr());
goto FAILED;
}
SlowLogClient *pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
if (pClient == NULL){
uError("failed to allocate memory for slow log client");
taosCloseFile(&pFile);
goto FAILED;
}
pClient->pFile = pFile;
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0){
uError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
taosCloseFile(&pFile);
taosMemoryFree(pClient);
goto FAILED;
}
if(taosLockFile(pFile) < 0){
uError("failed to lock file:%p since %s", pFile, terrstr());
goto FAILED;
}
SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId);
if(pInst == NULL){
uError("failed to get app instance by clusterId:%" PRId64, slowLogData->clusterId);
goto FAILED;
}
pClient->timer = taosTmrStart(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, (void*)slowLogData->clusterId, monitorTimer);
}else{
pFile = (*(SlowLogClient**)tmp)->pFile;
}
if (taosWriteFile(pFile, slowLogData->value, strlen(slowLogData->value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr());
}
uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId);
FAILED:
taosRUnLockLatch(&monitorLock);
}
void* monitorThreadFunc(void *param){
setThreadName("client-monitor-slowlog");
#ifdef WINDOWS
if (taosCheckCurrentInDll()) {
atexit(monitorThreadFuncUnexpectedStopped);
}
#endif
if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
return NULL;
}
char tmpPath[PATH_MAX] = {0};
if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0){
return NULL;
}
if (taosMulModeMkDir(tmpPath, 0777, true) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
printf("failed to create dir:%s since %s", tmpPath, terrstr());
return NULL;
}
if (tsem2_init(&monitorSem, 0, 0) != 0) {
uError("sem init error since %s", terrstr());
return NULL;
}
monitorQueue = taosOpenQueue();
if(monitorQueue == NULL){
uError("open queue error since %s", terrstr());
return NULL;
}
while (1) {
if (slowLogFlag > 0) break;
MonitorSlowLogData* slowLogData = NULL;
taosReadQitem(monitorQueue, (void**)&slowLogData);
if (slowLogData != NULL) {
uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value);
monitorWriteSlowLog2File(slowLogData, tmpPath);
}
monitorFreeSlowLogData(slowLogData);
taosFreeQitem(slowLogData);
tsem2_timewait(&monitorSem, 500);
}
taosCloseQueue(monitorQueue);
tsem2_destroy(&monitorSem);
slowLogFlag = -2;
return NULL;
}

View File

@ -21,7 +21,6 @@ const char* slowQueryName = "taos_slow_sql:count";
const char* slowQueryHelp = "slow query log when cost over than config duration";
const int slowQueryLabelCount = 4;
const char* slowQueryLabels[] = {"cluster_id", "username", "result", "duration"};
static const char* defaultClusterID = "";
const int64_t usInSeconds = 1000 * 1000;
const int64_t msInMinutes = 60 * 1000;
@ -39,21 +38,21 @@ static const char* getSlowQueryLableCostDesc(int64_t cost) {
return "0-3s";
}
void clientSlowQueryMonitorInit(const char* clusterKey) {
if (!tsEnableMonitor) return;
SAppInstInfo* pAppInstInfo = getAppInstInfo(clusterKey);
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
clusterMonitorInit(clusterKey, epSet, pAppInstInfo->pTransporter);
createClusterCounter(clusterKey, slowQueryName, slowQueryHelp, slowQueryLabelCount, slowQueryLabels);
void monitorClientSlowQueryInit(int64_t clusterid) {
monitorCreateClient(clusterid);
monitorCreateClientCounter(clusterid, slowQueryName, slowQueryHelp, slowQueryLabelCount, slowQueryLabels);
}
void clientSlowQueryLog(const char* clusterKey, const char* user, SQL_RESULT_CODE result, int32_t cost) {
const char* slowQueryLabelValues[] = {defaultClusterID, user, resultStr(result), getSlowQueryLableCostDesc(cost)};
taosClusterCounterInc(clusterKey, slowQueryName, slowQueryLabelValues);
void clientSlowQueryLog(int64_t clusterId, const char* user, SQL_RESULT_CODE result, int32_t cost) {
char clusterIdStr[32] = {0};
if (snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId) < 0){
uError("failed to generate clusterId:%" PRId64, clusterId);
}
const char* slowQueryLabelValues[] = {clusterIdStr, user, monitorResultStr(result), getSlowQueryLableCostDesc(cost)};
monitorCounterInc(clusterId, slowQueryName, slowQueryLabelValues);
}
void SlowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) {
if (!tsEnableMonitor) return;
void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) {
SQL_RESULT_CODE result = SQL_RESULT_SUCCESS;
if (TSDB_CODE_SUCCESS != code) {
result = SQL_RESULT_FAILED;
@ -66,12 +65,12 @@ void SlowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) {
STscObj* pTscObj = acquireTscObj(rid);
if (pTscObj != NULL) {
if(pTscObj->pAppInfo == NULL) {
tscLog("SlowQueryLog, not found pAppInfo");
tscLog("slowQueryLog, not found pAppInfo");
} else {
clientSlowQueryLog(pTscObj->pAppInfo->instKey, pTscObj->user, result, cost);
clientSlowQueryLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, cost);
}
releaseTscObj(rid);
} else {
tscLog("SlowQueryLog, not found rid");
tscLog("slowQueryLog, not found rid");
}
}

View File

@ -22,17 +22,12 @@ const char* selectMonitorHelp = "count for select sql";
const int selectMonitorLabelCount = 4;
const char* selectMonitorLabels[] = {"cluster_id", "sql_type", "username", "result"};
static const char* defaultClusterID = "";
void clientSQLReqMonitorInit(const char* clusterKey) {
if (!tsEnableMonitor) return;
SAppInstInfo* pAppInstInfo = getAppInstInfo(clusterKey);
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
clusterMonitorInit(clusterKey, epSet, pAppInstInfo->pTransporter);
createClusterCounter(clusterKey, selectMonitorName, selectMonitorHelp, selectMonitorLabelCount, selectMonitorLabels);
void monitorClientSQLReqInit(int64_t clusterId) {
monitorCreateClient(clusterId);
monitorCreateClientCounter(clusterId, selectMonitorName, selectMonitorHelp, selectMonitorLabelCount, selectMonitorLabels);
}
void clientSQLReqLog(const char* clusterKey, const char* user, SQL_RESULT_CODE result, int8_t type) {
void clientSQLReqLog(int64_t clusterId, const char* user, SQL_RESULT_CODE result, int8_t type) {
const char* typeStr;
switch (type) {
case MONITORSQLTYPEDELETE:
@ -45,12 +40,15 @@ void clientSQLReqLog(const char* clusterKey, const char* user, SQL_RESULT_CODE r
typeStr = "select";
break;
}
const char* selectMonitorLabelValues[] = {defaultClusterID, typeStr, user, resultStr(result)};
taosClusterCounterInc(clusterKey, selectMonitorName, selectMonitorLabelValues);
char clusterIdStr[32] = {0};
if (snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId) < 0){
uError("failed to generate clusterId:%" PRId64, clusterId);
}
const char* selectMonitorLabelValues[] = {clusterIdStr, typeStr, user, monitorResultStr(result)};
monitorCounterInc(clusterId, selectMonitorName, selectMonitorLabelValues);
}
void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type) {
if (!tsEnableMonitor) return;
SQL_RESULT_CODE result = SQL_RESULT_SUCCESS;
if (TSDB_CODE_SUCCESS != code) {
result = SQL_RESULT_FAILED;
@ -65,15 +63,10 @@ void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type) {
if (pTscObj->pAppInfo == NULL) {
tscLog("sqlReqLog, not found pAppInfo");
} else {
clientSQLReqLog(pTscObj->pAppInfo->instKey, pTscObj->user, result, type);
clientSQLReqLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, type);
}
releaseTscObj(rid);
} else {
tscLog("sqlReqLog, not found rid");
}
}
void clientMonitorClose(const char* clusterKey) {
tscLog("clientMonitorClose, key:%s", clusterKey);
clusterMonitorClose(clusterKey);
}

View File

@ -15,6 +15,7 @@
#include "catalog.h"
#include "clientInt.h"
#include "clientMonitor.h"
#include "clientLog.h"
#include "cmdnodes.h"
#include "os.h"
@ -140,6 +141,9 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
// update the appInstInfo
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
pTscObj->pAppInfo->monitorParas = connectRsp.monitorParas;
tscDebug("[monitor] paras from connect rsp, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope);
lastClusterId = connectRsp.clusterId;
pTscObj->connType = connectRsp.connType;
@ -147,6 +151,15 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
pTscObj->authVer = connectRsp.authVer;
pTscObj->whiteListInfo.ver = connectRsp.whiteListVer;
if(taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL){
if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){
tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
}
monitorSendAllSlowLogFromTempDir(pTscObj->pAppInfo);
monitorClientSlowQueryInit(connectRsp.clusterId);
monitorClientSQLReqInit(connectRsp.clusterId);
}
taosThreadMutexLock(&clientHbMgr.lock);
SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
if (pAppHbMgr) {
@ -233,7 +246,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
struct SCatalog* pCatalog = NULL;
if (usedbRsp.vgVersion >= 0) { // cached in local
uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
int64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
if (code1 != TSDB_CODE_SUCCESS) {
tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId,

View File

@ -1851,7 +1851,7 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
}
void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) {
if (tsSlowLogScope & SLOW_LOG_TYPE_INSERT) {
if (request->pTscObj->pAppInfo->monitorParas.tsSlowLogScope & SLOW_LOG_TYPE_INSERT) {
int32_t len = 0;
int32_t rlen = 0;
char *p = NULL;

View File

@ -35,34 +35,34 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS();
}
TEST(clientMonitorTest, monitorTest) {
const char* cluster1 = "cluster1";
const char* cluster2 = "cluster2";
SEpSet epSet;
clusterMonitorInit(cluster1, epSet, NULL);
const char* counterName1 = "slow_query";
const char* counterName2 = "select_count";
const char* help1 = "test for slowQuery";
const char* help2 = "test for selectSQL";
const char* lables[] = {"lable1"};
taos_counter_t* c1 = createClusterCounter(cluster1, counterName1, help1, 1, lables);
ASSERT_TRUE(c1 != NULL);
taos_counter_t* c2 = createClusterCounter(cluster1, counterName2, help2, 1, lables);
ASSERT_TRUE(c2 != NULL);
ASSERT_TRUE(c1 != c2);
taos_counter_t* c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables);
ASSERT_TRUE(c21 == NULL);
clusterMonitorInit(cluster2, epSet, NULL);
c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables);
ASSERT_TRUE(c21 != NULL);
int i = 0;
while (i < 12) {
taosMsleep(10);
++i;
}
clusterMonitorClose(cluster1);
clusterMonitorClose(cluster2);
}
//TEST(clientMonitorTest, monitorTest) {
// const char* cluster1 = "cluster1";
// const char* cluster2 = "cluster2";
// SEpSet epSet;
// clientMonitorInit(cluster1, epSet, NULL);
// const char* counterName1 = "slow_query";
// const char* counterName2 = "select_count";
// const char* help1 = "test for slowQuery";
// const char* help2 = "test for selectSQL";
// const char* lables[] = {"lable1"};
// taos_counter_t* c1 = createClusterCounter(cluster1, counterName1, help1, 1, lables);
// ASSERT_TRUE(c1 != NULL);
// taos_counter_t* c2 = createClusterCounter(cluster1, counterName2, help2, 1, lables);
// ASSERT_TRUE(c2 != NULL);
// ASSERT_TRUE(c1 != c2);
// taos_counter_t* c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables);
// ASSERT_TRUE(c21 == NULL);
// clientMonitorInit(cluster2, epSet, NULL);
// c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables);
// ASSERT_TRUE(c21 != NULL);
// int i = 0;
// while (i < 12) {
// taosMsleep(10);
// ++i;
// }
// clusterMonitorClose(cluster1);
// clusterMonitorClose(cluster2);
//}
TEST(clientMonitorTest, sendTest) {
TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
@ -70,13 +70,100 @@ TEST(clientMonitorTest, sendTest) {
printf("connect taosd sucessfully.\n");
int64_t rid = *(int64_t *)taos;
SlowQueryLog(rid, false, -1, 1000);
slowQueryLog(rid, false, -1, 1000);
int i = 0;
while (i < 20) {
SlowQueryLog(rid, false, 0, i * 1000);
slowQueryLog(rid, false, 0, i * 1000);
taosMsleep(10);
++i;
}
taos_close(taos);
}
TEST(clientMonitorTest, ReadOneFile) {
// Create a TdFilePtr object and set it up for testing
TdFilePtr pFile = taosOpenFile("./tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("failed to open file:./test.txt since %s", terrstr());
return;
}
const int batch = 10;
const int size = SLOW_LOG_SEND_SIZE/batch;
for(int i = 0; i < batch + 1; i++){
char value[size] = {0};
memset(value, '0' + i, size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr());
}
}
// Create a void pointer and set it up for testing
void* pTransporter = NULL;
// Create an SEpSet object and set it up for testing
SEpSet* epSet = NULL;
// Call the function to be tested
monitorReadSendSlowLog(pFile, pTransporter, epSet);
char value[size] = {0};
memset(value, '0', size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr());
}
monitorReadSendSlowLog(pFile, pTransporter, epSet);
// Clean up any resources created for testing
taosCloseFile(&pFile);
}
TEST(clientMonitorTest, ReadTwoFile) {
// Create a TdFilePtr object and set it up for testing
TdFilePtr pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("failed to open file:./test.txt since %s", terrstr());
return;
}
const int batch = 10;
const int size = SLOW_LOG_SEND_SIZE/batch;
for(int i = 0; i < batch + 1; i++){
char value[size] = {0};
memset(value, '0' + i, size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr());
}
}
taosFsyncFile(pFile);
taosCloseFile(&pFile);
pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-2-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("failed to open file:./test.txt since %s", terrstr());
return;
}
for(int i = 0; i < batch + 1; i++){
char value[size] = {0};
memset(value, '0' + i, size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr());
}
}
taosFsyncFile(pFile);
taosCloseFile(&pFile);
SAppInstInfo pAppInfo = {0};
pAppInfo.clusterId = 2;
pAppInfo.monitorParas.tsEnableMonitor = 1;
strcpy(tsTempDir,"/tmp");
monitorSendAllSlowLogFromTempDir(&pAppInfo);
}

View File

@ -1520,6 +1520,13 @@ TEST(clientCase, sub_tb_test) {
}
TEST(clientCase, sub_tb_mt_test) {
char *user = NULL;
char *auth = NULL;
char *ip = NULL;
int port = 0;
char key[512] = {0};
snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TdThread qid[20] = {0};
@ -1532,4 +1539,38 @@ TEST(clientCase, sub_tb_mt_test) {
}
}
//static void concatStrings(SArray *list, char* buf, int size){
// int len = 0;
// for(int i = 0; i < taosArrayGetSize(list); i++){
// char* db = (char*)taosArrayGet(list, i);
// char* dot = strchr(db, '.');
// if (dot != NULL) {
// db = dot + 1;
// }
// if (i != 0){
// strcat(buf, ",");
// len += 1;
// }
// int ret = snprintf(buf + len, size - len, "%s", db);
// if (ret < 0) {
// printf("snprintf failed, buf:%s, ret:%d", buf, ret);
// break;
// }
// len += ret;
// if (len >= size){
// printf("dbList is truncated, buf:%s, len:%d", buf, len);
// break;
// }
// }
//}
//
//TEST(clientCase, concat_string_test) {
// SArray* list = taosArrayInit(10, TSDB_DB_FNAME_LEN);
// taosArrayPush(list, "1.db1");
// taosArrayPush(list, "2.db2");
//
// char buf[32] = {0};
// concatStrings(list, buf, sizeof(buf) - 1);
//}
#pragma GCC diagnostic pop

View File

@ -179,8 +179,12 @@ int32_t tsRedirectMaxPeriod = 1000;
int32_t tsMaxRetryWaitTime = 10000;
bool tsUseAdapter = false;
int32_t tsMetaCacheMaxSize = -1; // MB
int32_t tsSlowLogThreshold = 3; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;
int32_t tsSlowLogThreshold = 10; // seconds
int32_t tsSlowLogThresholdTest = 10; // seconds
char tsSlowLogExceptDb[TSDB_DB_NAME_LEN] = ""; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_QUERY;
char* tsSlowLogScopeString = "query";
int32_t tsSlowLogMaxLen = 4096;
int32_t tsTimeSeriesThreshold = 50;
bool tsMultiResultFunctionStarReturnTags = false;
@ -543,9 +547,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
return -1;
if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
return -1;
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
return -1;
if (cfgAddString(pCfg, "slowLogScope", "", CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
@ -570,9 +571,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
return -1;
if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "multiResultFunctionStarReturnTags", tsMultiResultFunctionStarReturnTags, CFG_SCOPE_CLIENT,
CFG_DYN_CLIENT) != 0)
return -1;
@ -702,6 +700,15 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "skipGrant", tsMndSkipGrant, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 86400, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "slowLogThresholdTest", tsSlowLogThresholdTest, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 1, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "slowLogMaxLen", tsSlowLogMaxLen, 1, 16384, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "slowLogScope", tsSlowLogScopeString, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "slowLogExceptDb", tsSlowLogExceptDb, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
@ -966,40 +973,52 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
sndDebugFlag = cfgGetItem(pCfg, "sndDebugFlag")->i32;
}
static int32_t taosSetSlowLogScope(const char *pScope) {
static int32_t taosSetSlowLogScope(char *pScope) {
if (NULL == pScope || 0 == strlen(pScope)) {
tsSlowLogScope = SLOW_LOG_TYPE_ALL;
return 0;
return SLOW_LOG_TYPE_QUERY;
}
if (0 == strcasecmp(pScope, "all")) {
tsSlowLogScope = SLOW_LOG_TYPE_ALL;
return 0;
int32_t slowScope = 0;
char* scope = NULL;
char *tmp = NULL;
while((scope = strsep(&pScope, "|")) != NULL){
taosMemoryFreeClear(tmp);
tmp = strdup(scope);
strtrim(tmp);
if (0 == strcasecmp(tmp, "all")) {
slowScope |= SLOW_LOG_TYPE_ALL;
continue;
}
if (0 == strcasecmp(tmp, "query")) {
slowScope |= SLOW_LOG_TYPE_QUERY;
continue;
}
if (0 == strcasecmp(tmp, "insert")) {
slowScope |= SLOW_LOG_TYPE_INSERT;
continue;
}
if (0 == strcasecmp(tmp, "others")) {
slowScope |= SLOW_LOG_TYPE_OTHERS;
continue;
}
if (0 == strcasecmp(tmp, "none")) {
slowScope |= SLOW_LOG_TYPE_NULL;
continue;
}
taosMemoryFreeClear(tmp);
uError("Invalid slowLog scope value:%s", pScope);
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
if (0 == strcasecmp(pScope, "query")) {
tsSlowLogScope = SLOW_LOG_TYPE_QUERY;
return 0;
}
if (0 == strcasecmp(pScope, "insert")) {
tsSlowLogScope = SLOW_LOG_TYPE_INSERT;
return 0;
}
if (0 == strcasecmp(pScope, "others")) {
tsSlowLogScope = SLOW_LOG_TYPE_OTHERS;
return 0;
}
if (0 == strcasecmp(pScope, "none")) {
tsSlowLogScope = 0;
return 0;
}
uError("Invalid slowLog scope value:%s", pScope);
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
taosMemoryFreeClear(tmp);
return slowScope;
}
// for common configs
@ -1057,12 +1076,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64;
tsMetaCacheMaxSize = cfgGetItem(pCfg, "metaCacheMaxSize")->i32;
tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32;
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) {
return -1;
}
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
@ -1136,6 +1149,16 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsSIMDEnable = (bool)cfgGetItem(pCfg, "simdEnable")->bval;
tsTagFilterCache = (bool)cfgGetItem(pCfg, "tagFilterCache")->bval;
tstrncpy(tsSlowLogExceptDb, cfgGetItem(pCfg, "slowLogExceptDb")->str, TSDB_DB_NAME_LEN);
tsSlowLogThresholdTest = cfgGetItem(pCfg, "slowLogThresholdTest")->i32;
tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32;
tsSlowLogMaxLen = cfgGetItem(pCfg, "slowLogMaxLen")->i32;
int32_t scope = taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str);
if(scope < 0){
return -1;
}
tsSlowLogScope = scope;
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN);
@ -1423,8 +1446,7 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize,
char *name = pItem->name;
for (int32_t d = 0; d < optionSize; ++d) {
const char *optName = pOptions[d].optionName;
int32_t optLen = strlen(optName);
if (strncasecmp(name, optName, optLen) != 0) continue;
if (strcasecmp(name, optName) != 0) continue;
switch (pItem->dtype) {
case CFG_DTYPE_BOOL: {
int32_t flag = pItem->i32;
@ -1497,6 +1519,23 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
return 0;
}
if (strcasecmp("slowLogScope", name) == 0) {
int32_t scope = taosSetSlowLogScope(pItem->str);
if(scope < 0){
cfgUnLock(pCfg);
return -1;
}
tsSlowLogScope = scope;
cfgUnLock(pCfg);
return 0;
}
if (strcasecmp("slowLogExceptDb", name) == 0) {
tstrncpy(tsSlowLogExceptDb, pItem->str, TSDB_DB_NAME_LEN);
cfgUnLock(pCfg);
return 0;
}
{ // 'bool/int32_t/int64_t/float/double' variables with general modification function
static OptionNameAndVar debugOptions[] = {
{"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag}, {"mDebugFlag", &mDebugFlag},
@ -1514,6 +1553,10 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"enableWhiteList", &tsEnableWhiteList},
{"telemetryReporting", &tsEnableTelem},
{"monitor", &tsEnableMonitor},
{"monitorInterval", &tsMonitorInterval},
{"slowLogThreshold", &tsSlowLogThreshold},
{"slowLogThresholdTest", &tsSlowLogThresholdTest},
{"slowLogMaxLen", &tsSlowLogMaxLen},
{"mndSdbWriteDelta", &tsMndSdbWriteDelta},
{"minDiskFreeSize", &tsMinDiskFreeSize},
@ -1656,10 +1699,6 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
tsLogSpace.reserved = (int64_t)(((double)pItem->fval) * 1024 * 1024 * 1024);
uInfo("%s set to %" PRId64, name, tsLogSpace.reserved);
matched = true;
} else if (strcasecmp("monitor", name) == 0) {
tsEnableMonitor = pItem->bval;
uInfo("%s set to %d", name, tsEnableMonitor);
matched = true;
}
break;
}
@ -1703,13 +1742,6 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype, false);
uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst);
matched = true;
} else if (strcasecmp("slowLogScope", name) == 0) {
if (taosSetSlowLogScope(pItem->str)) {
cfgUnLock(pCfg);
return -1;
}
uInfo("%s set to %s", name, pItem->str);
matched = true;
}
break;
}
@ -1770,7 +1802,6 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
{"queryUseNodeAllocator", &tsQueryUseNodeAllocator},
{"smlDot2Underline", &tsSmlDot2Underline},
{"shellActivityTimer", &tsShellActivityTimer},
{"slowLogThreshold", &tsSlowLogThreshold},
{"useAdapter", &tsUseAdapter},
{"experimental", &tsExperimental},
{"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags},

View File

@ -69,6 +69,28 @@
pReq->sql = NULL; \
} while (0)
static int32_t tSerializeSMonitorParas(SEncoder *encoder, const SMonitorParas* pMonitorParas) {
if (tEncodeI8(encoder, pMonitorParas->tsEnableMonitor) < 0) return -1;
if (tEncodeI32(encoder, pMonitorParas->tsMonitorInterval) < 0) return -1;
if (tEncodeI32(encoder, pMonitorParas->tsSlowLogScope) < 0) return -1;
if (tEncodeI32(encoder, pMonitorParas->tsSlowLogMaxLen) < 0) return -1;
if (tEncodeI32(encoder, pMonitorParas->tsSlowLogThreshold) < 0) return -1;
if (tEncodeI32(encoder, pMonitorParas->tsSlowLogThresholdTest) < 0) return -1;
if (tEncodeCStr(encoder, pMonitorParas->tsSlowLogExceptDb) < 0) return -1;
return 0;
}
static int32_t tDeserializeSMonitorParas(SDecoder *decoder, SMonitorParas* pMonitorParas){
if (tDecodeI8(decoder, (int8_t *)&pMonitorParas->tsEnableMonitor) < 0) return -1;
if (tDecodeI32(decoder, &pMonitorParas->tsMonitorInterval) < 0) return -1;
if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogScope) < 0) return -1;
if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogMaxLen) < 0) return -1;
if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogThreshold) < 0) return -1;
if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogThresholdTest) < 0) return -1;
if (tDecodeCStrTo(decoder, pMonitorParas->tsSlowLogExceptDb) < 0) return -1;
return 0;
}
static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq);
static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq);
static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoRsp *pRsp);
@ -519,6 +541,7 @@ int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBa
SClientHbRsp *pRsp = taosArrayGet(pBatchRsp->rsps, i);
if (tSerializeSClientHbRsp(&encoder, pRsp) < 0) return -1;
}
if (tSerializeSMonitorParas(&encoder, &pBatchRsp->monitorParas) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -546,6 +569,10 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
taosArrayPush(pBatchRsp->rsps, &rsp);
}
if (!tDecodeIsEnd(&decoder)) {
if (tDeserializeSMonitorParas(&decoder, &pBatchRsp->monitorParas) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
@ -1308,6 +1335,9 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
}
if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) return -1;
if (tSerializeSMonitorParas(&encoder, &pReq->clusterCfg.monitorParas) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1426,6 +1456,10 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1;
}
if (!tDecodeIsEnd(&decoder)) {
if (tDeserializeSMonitorParas(&decoder, &pReq->clusterCfg.monitorParas) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
@ -1521,6 +1555,7 @@ int32_t tSerializeSStatisReq(void *buf, int32_t bufLen, SStatisReq *pReq) {
if (tEncodeI32(&encoder, pReq->contLen) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->pCont) < 0) return -1;
if (tEncodeI8(&encoder, pReq->type) < 0) return -1;
tEndEncode(&encoder);
@ -1541,7 +1576,9 @@ int32_t tDeserializeSStatisReq(void *buf, int32_t bufLen, SStatisReq *pReq) {
if (pReq->pCont == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->pCont) < 0) return -1;
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, (int8_t*)&pReq->type) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
@ -5153,6 +5190,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->authVer) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->whiteListVer) < 0) return -1;
if (tSerializeSMonitorParas(&encoder, &pRsp->monitorParas) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -5194,6 +5232,9 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
} else {
pRsp->whiteListVer = 0;
}
if (!tDecodeIsEnd(&decoder)) {
if (tDeserializeSMonitorParas(&decoder, &pRsp->monitorParas) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -726,4 +726,118 @@ TEST(AlreadyAddGroupIdTest, GroupIdAddedWithDifferentLength) {
EXPECT_FALSE(result);
}
#define SLOW_LOG_TYPE_NULL 0x0
#define SLOW_LOG_TYPE_QUERY 0x1
#define SLOW_LOG_TYPE_INSERT 0x2
#define SLOW_LOG_TYPE_OTHERS 0x4
#define SLOW_LOG_TYPE_ALL 0x7
static int32_t taosSetSlowLogScope(char *pScope) {
if (NULL == pScope || 0 == strlen(pScope)) {
return SLOW_LOG_TYPE_QUERY;
}
int32_t slowScope = 0;
char* scope = NULL;
char *tmp = NULL;
while((scope = strsep(&pScope, "|")) != NULL){
taosMemoryFreeClear(tmp);
tmp = strdup(scope);
strtrim(tmp);
if (0 == strcasecmp(tmp, "all")) {
slowScope |= SLOW_LOG_TYPE_ALL;
continue;
}
if (0 == strcasecmp(tmp, "query")) {
slowScope |= SLOW_LOG_TYPE_QUERY;
continue;
}
if (0 == strcasecmp(tmp, "insert")) {
slowScope |= SLOW_LOG_TYPE_INSERT;
continue;
}
if (0 == strcasecmp(tmp, "others")) {
slowScope |= SLOW_LOG_TYPE_OTHERS;
continue;
}
if (0 == strcasecmp(tmp, "none")) {
slowScope |= SLOW_LOG_TYPE_NULL;
continue;
}
taosMemoryFreeClear(tmp);
uError("Invalid slowLog scope value:%s", pScope);
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
taosMemoryFreeClear(tmp);
return slowScope;
}
TEST(TaosSetSlowLogScopeTest, NullPointerInput) {
char *pScope = NULL;
int32_t result = taosSetSlowLogScope(pScope);
EXPECT_EQ(result, SLOW_LOG_TYPE_QUERY);
}
TEST(TaosSetSlowLogScopeTest, EmptyStringInput) {
char pScope[1] = "";
int32_t result = taosSetSlowLogScope(pScope);
EXPECT_EQ(result, SLOW_LOG_TYPE_QUERY);
}
TEST(TaosSetSlowLogScopeTest, AllScopeInput) {
char pScope[] = "all";
int32_t result = taosSetSlowLogScope(pScope);
EXPECT_EQ(result, SLOW_LOG_TYPE_ALL);
}
TEST(TaosSetSlowLogScopeTest, QueryScopeInput) {
char pScope[] = " query";
int32_t result = taosSetSlowLogScope(pScope);
EXPECT_EQ(result, SLOW_LOG_TYPE_QUERY);
}
TEST(TaosSetSlowLogScopeTest, InsertScopeInput) {
char pScope[] = "insert";
int32_t result = taosSetSlowLogScope(pScope);
EXPECT_EQ(result, SLOW_LOG_TYPE_INSERT);
}
TEST(TaosSetSlowLogScopeTest, OthersScopeInput) {
char pScope[] = "others";
int32_t result = taosSetSlowLogScope(pScope);
EXPECT_EQ(result, SLOW_LOG_TYPE_OTHERS);
}
TEST(TaosSetSlowLogScopeTest, NoneScopeInput) {
char pScope[] = "none";
int32_t result = taosSetSlowLogScope(pScope);
EXPECT_EQ(result, SLOW_LOG_TYPE_NULL);
}
TEST(TaosSetSlowLogScopeTest, InvalidScopeInput) {
char pScope[] = "invalid";
int32_t result = taosSetSlowLogScope(pScope);
EXPECT_EQ(result, -1);
}
TEST(TaosSetSlowLogScopeTest, MixedScopesInput) {
char pScope[] = "query|insert|others|none";
int32_t result = taosSetSlowLogScope(pScope);
EXPECT_EQ(result, (SLOW_LOG_TYPE_QUERY | SLOW_LOG_TYPE_INSERT | SLOW_LOG_TYPE_OTHERS));
}
TEST(TaosSetSlowLogScopeTest, MixedScopesInputWithSpaces) {
char pScope[] = "query | insert | others ";
int32_t result = taosSetSlowLogScope(pScope);
EXPECT_EQ(result, (SLOW_LOG_TYPE_QUERY | SLOW_LOG_TYPE_INSERT | SLOW_LOG_TYPE_OTHERS));
}
#pragma GCC diagnostic pop

View File

@ -116,6 +116,13 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0;
req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat;
req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum;
req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor;
req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval;
req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope;
req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
req.clusterCfg.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest;
tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);

View File

@ -143,6 +143,7 @@ typedef enum {
DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH,
DND_REASON_ENABLE_WHITELIST_NOT_MATCH,
DND_REASON_ENCRYPTION_KEY_NOT_MATCH,
DND_REASON_STATUS_MONITOR_NOT_MATCH,
DND_REASON_OTHERS
} EDndReason;

View File

@ -438,7 +438,27 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
}
}
#define CHECK_MONITOR_PARA(para) \
if (pCfg->monitorParas.para != para) { \
mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS; \
return DND_REASON_STATUS_MONITOR_NOT_MATCH;\
}
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
CHECK_MONITOR_PARA(tsEnableMonitor);
CHECK_MONITOR_PARA(tsMonitorInterval);
CHECK_MONITOR_PARA(tsSlowLogThreshold);
CHECK_MONITOR_PARA(tsSlowLogThresholdTest);
CHECK_MONITOR_PARA(tsSlowLogMaxLen);
CHECK_MONITOR_PARA(tsSlowLogScope);
if (0 != strcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id, pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
return DND_REASON_STATUS_MONITOR_NOT_MATCH;
}
if (pCfg->statusInterval != tsStatusInterval) {
mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
tsStatusInterval);
@ -530,6 +550,8 @@ static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
return stateChanged;
}
extern char* tsMonFwUri;
extern char* tsMonSlowLogUri;
static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStatisReq statisReq = {0};
@ -547,188 +569,14 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
mInfo("process statis req,\n %s", statisReq.pCont);
}
SJson *pJson = tjsonParse(statisReq.pCont);
int32_t ts_size = tjsonGetArraySize(pJson);
for (int32_t i = 0; i < ts_size; i++) {
SJson *item = tjsonGetArrayItem(pJson, i);
SJson *tables = tjsonGetObjectItem(item, "tables");
int32_t tableSize = tjsonGetArraySize(tables);
for (int32_t i = 0; i < tableSize; i++) {
SJson *table = tjsonGetArrayItem(tables, i);
char tableName[MONITOR_TABLENAME_LEN] = {0};
tjsonGetStringValue(table, "name", tableName);
SJson *metricGroups = tjsonGetObjectItem(table, "metric_groups");
int32_t size = tjsonGetArraySize(metricGroups);
for (int32_t i = 0; i < size; i++) {
SJson *item = tjsonGetArrayItem(metricGroups, i);
SJson *arrayTag = tjsonGetObjectItem(item, "tags");
int32_t tagSize = tjsonGetArraySize(arrayTag);
for (int32_t j = 0; j < tagSize; j++) {
SJson *item = tjsonGetArrayItem(arrayTag, j);
char tagName[MONITOR_TAG_NAME_LEN] = {0};
tjsonGetStringValue(item, "name", tagName);
if (strncmp(tagName, "cluster_id", MONITOR_TAG_NAME_LEN) == 0) {
tjsonDeleteItemFromObject(item, "value");
tjsonAddStringToObject(item, "value", strClusterId);
}
}
}
}
}
char *pCont = tjsonToString(pJson);
monSendContent(pCont);
if (pJson != NULL) {
tjsonDelete(pJson);
pJson = NULL;
}
if (pCont != NULL) {
taosMemoryFree(pCont);
pCont = NULL;
if (statisReq.type == MONITOR_TYPE_COUNTER){
monSendContent(statisReq.pCont, tsMonFwUri);
}else if(statisReq.type == MONITOR_TYPE_SLOW_LOG){
monSendContent(statisReq.pCont, tsMonSlowLogUri);
}
tFreeSStatisReq(&statisReq);
return 0;
/*
SJson* pJson = tjsonParse(statisReq.pCont);
int32_t ts_size = tjsonGetArraySize(pJson);
for(int32_t i = 0; i < ts_size; i++){
SJson* item = tjsonGetArrayItem(pJson, i);
SJson* tables = tjsonGetObjectItem(item, "tables");
int32_t tableSize = tjsonGetArraySize(tables);
for(int32_t i = 0; i < tableSize; i++){
SJson* table = tjsonGetArrayItem(tables, i);
char tableName[MONITOR_TABLENAME_LEN] = {0};
tjsonGetStringValue(table, "name", tableName);
SJson* metricGroups = tjsonGetObjectItem(table, "metric_groups");
int32_t size = tjsonGetArraySize(metricGroups);
for(int32_t i = 0; i < size; i++){
SJson* item = tjsonGetArrayItem(metricGroups, i);
SJson* arrayTag = tjsonGetObjectItem(item, "tags");
int32_t tagSize = tjsonGetArraySize(arrayTag);
char** labels = taosMemoryMalloc(sizeof(char*) * tagSize);
char** sample_labels = taosMemoryMalloc(sizeof(char*) * tagSize);
for(int32_t j = 0; j < tagSize; j++){
SJson* item = tjsonGetArrayItem(arrayTag, j);
*(labels + j) = taosMemoryMalloc(MONITOR_TAG_NAME_LEN);
tjsonGetStringValue(item, "name", *(labels + j));
*(sample_labels + j) = taosMemoryMalloc(MONITOR_TAG_VALUE_LEN);
tjsonGetStringValue(item, "value", *(sample_labels + j));
if(strncmp(*(labels + j), "cluster_id", MONITOR_TAG_NAME_LEN) == 0) {
strncpy(*(sample_labels + j), strClusterId, MONITOR_TAG_VALUE_LEN);
}
}
SJson* metrics = tjsonGetObjectItem(item, "metrics");
int32_t metricLen = tjsonGetArraySize(metrics);
for(int32_t j = 0; j < metricLen; j++){
SJson *item = tjsonGetArrayItem(metrics, j);
char name[MONITOR_METRIC_NAME_LEN] = {0};
tjsonGetStringValue(item, "name", name);
double value = 0;
tjsonGetDoubleValue(item, "value", &value);
double type = 0;
tjsonGetDoubleValue(item, "type", &type);
int32_t metricNameLen = strlen(name) + strlen(tableName) + 2;
char* metricName = taosMemoryMalloc(metricNameLen);
memset(metricName, 0, metricNameLen);
sprintf(metricName, "%s:%s", tableName, name);
taos_metric_t* metric = taos_collector_registry_get_metric(metricName);
if(metric == NULL){
if(type == 0){
metric = taos_counter_new(metricName, "", tagSize, (const char**)labels);
}
if(type == 1){
metric = taos_gauge_new(metricName, "", tagSize, (const char**)labels);
}
mTrace("fail to get metric from registry, new one metric:%p", metric);
if(taos_collector_registry_register_metric(metric) == 1){
if(type == 0){
taos_counter_destroy(metric);
}
if(type == 1){
taos_gauge_destroy(metric);
}
metric = taos_collector_registry_get_metric(metricName);
mTrace("fail to register metric, get metric from registry:%p", metric);
}
else{
mTrace("succeed to register metric:%p", metric);
}
}
else{
mTrace("get metric from registry:%p", metric);
}
if(type == 0){
taos_counter_add(metric, value, (const char**)sample_labels);
}
if(type == 1){
taos_gauge_set(metric, value, (const char**)sample_labels);
}
taosMemoryFreeClear(metricName);
}
for(int32_t j = 0; j < tagSize; j++){
taosMemoryFreeClear(*(labels + j));
taosMemoryFreeClear(*(sample_labels + j));
}
taosMemoryFreeClear(sample_labels);
taosMemoryFreeClear(labels);
}
}
}
code = 0;
_OVER:
if(pJson != NULL){
tjsonDelete(pJson);
pJson = NULL;
}
tFreeSStatisReq(&statisReq);
return code;
*/
}
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
@ -1092,6 +940,32 @@ _OVER:
return code;
}
static void getSlowLogScopeString(int32_t scope, char* result){
if(scope == SLOW_LOG_TYPE_NULL) {
strcat(result, "NONE");
return;
}
while(scope > 0){
if(scope & SLOW_LOG_TYPE_QUERY) {
strcat(result, "QUERY");
scope &= ~SLOW_LOG_TYPE_QUERY;
} else if(scope & SLOW_LOG_TYPE_INSERT) {
strcat(result, "INSERT");
scope &= ~SLOW_LOG_TYPE_INSERT;
} else if(scope & SLOW_LOG_TYPE_OTHERS) {
strcat(result, "OTHERS");
scope &= ~SLOW_LOG_TYPE_OTHERS;
} else{
printf("invalid slow log scope:%d", scope);
return;
}
if(scope > 0) {
strcat(result, "|");
}
}
}
static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
SShowVariablesRsp rsp = {0};
int32_t code = -1;
@ -1100,7 +974,7 @@ static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
goto _OVER;
}
rsp.variables = taosArrayInit(4, sizeof(SVariablesInfo));
rsp.variables = taosArrayInit(16, sizeof(SVariablesInfo));
if (NULL == rsp.variables) {
mError("failed to alloc SVariablesInfo array while process show variables req");
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -1129,6 +1003,33 @@ static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
strcpy(info.scope, "both");
taosArrayPush(rsp.variables, &info);
strcpy(info.name, "monitor");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
strcpy(info.scope, "server");
taosArrayPush(rsp.variables, &info);
strcpy(info.name, "monitorInterval");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
strcpy(info.scope, "server");
taosArrayPush(rsp.variables, &info);
strcpy(info.name, "slowLogThreshold");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
strcpy(info.scope, "server");
taosArrayPush(rsp.variables, &info);
strcpy(info.name, "slowLogMaxLen");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
strcpy(info.scope, "server");
taosArrayPush(rsp.variables, &info);
char scopeStr[64] = {0};
getSlowLogScopeString(tsSlowLogScope, scopeStr);
strcpy(info.name, "slowLogScope");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
strcpy(info.scope, "server");
taosArrayPush(rsp.variables, &info);
int32_t rspLen = tSerializeSShowVariablesRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
@ -1679,6 +1580,28 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
totalRows++;
cfgOpts[totalRows] = "monitor";
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
totalRows++;
cfgOpts[totalRows] = "monitorInterval";
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
totalRows++;
cfgOpts[totalRows] = "slowLogThreshold";
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
totalRows++;
cfgOpts[totalRows] = "slowLogMaxLen";
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
totalRows++;
char scopeStr[64] = {0};
getSlowLogScopeString(tsSlowLogScope, scopeStr);
cfgOpts[totalRows] = "slowLogScope";
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
totalRows++;
char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};

View File

@ -282,7 +282,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
}
}
_CONNECT:
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
if (pConn == NULL) {
@ -301,6 +300,13 @@ _CONNECT:
connectRsp.svrTimestamp = taosGetTimestampSec();
connectRsp.passVer = pUser->passVersion;
connectRsp.authVer = pUser->authVersion;
connectRsp.monitorParas.tsEnableMonitor = tsEnableMonitor;
connectRsp.monitorParas.tsMonitorInterval = tsMonitorInterval;
connectRsp.monitorParas.tsSlowLogScope = tsSlowLogScope;
connectRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
connectRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
connectRsp.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest;
tstrncpy(connectRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
connectRsp.whiteListVer = pUser->ipWhiteListVer;
strcpy(connectRsp.sVer, version);
@ -660,6 +666,13 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
SClientHbBatchRsp batchRsp = {0};
batchRsp.svrTimestamp = taosGetTimestampSec();
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
batchRsp.monitorParas.tsEnableMonitor = tsEnableMonitor;
batchRsp.monitorParas.tsMonitorInterval = tsMonitorInterval;
batchRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
batchRsp.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest;
tstrncpy(batchRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
batchRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
batchRsp.monitorParas.tsSlowLogScope = tsSlowLogScope;
int32_t sz = taosArrayGetSize(batchReq.reqs);
for (int i = 0; i < sz; i++) {

View File

@ -16,3 +16,4 @@ add_subdirectory(stb)
add_subdirectory(topic)
add_subdirectory(trans)
#add_subdirectory(user)
#add_subdirectory(mnode)

View File

@ -1,11 +1,11 @@
# aux_source_directory(. MNODE_MNODE_TEST_SRC)
# add_executable(mmnodeTest ${MNODE_MNODE_TEST_SRC})
# target_link_libraries(
# mmnodeTest
# PUBLIC sut
# )
aux_source_directory(. MNODE_MNODE_TEST_SRC)
add_executable(mmnodeTest ${MNODE_MNODE_TEST_SRC})
target_link_libraries(
mmnodeTest
PUBLIC sut
)
# add_test(
# NAME mmnodeTest
# COMMAND mmnodeTest
# )
add_test(
NAME mmnodeTest
COMMAND mmnodeTest
)

View File

@ -281,3 +281,113 @@ TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) {
ASSERT_NE(retry, retryMax);
}
}
#define SLOW_LOG_TYPE_NULL 0x0
#define SLOW_LOG_TYPE_QUERY 0x1
#define SLOW_LOG_TYPE_INSERT 0x2
#define SLOW_LOG_TYPE_OTHERS 0x4
#define SLOW_LOG_TYPE_ALL 0x7
void getSlowLogScopeString(int32_t scope, char* result){
if(scope == SLOW_LOG_TYPE_NULL) {
strcat(result, "NONE");
return;
}
while(scope > 0){
if(scope & SLOW_LOG_TYPE_QUERY) {
strcat(result, "QUERY");
scope &= ~SLOW_LOG_TYPE_QUERY;
} else if(scope & SLOW_LOG_TYPE_INSERT) {
strcat(result, "INSERT");
scope &= ~SLOW_LOG_TYPE_INSERT;
} else if(scope & SLOW_LOG_TYPE_OTHERS) {
strcat(result, "OTHERS");
scope &= ~SLOW_LOG_TYPE_OTHERS;
} else{
printf("invalid slow log scope:%d", scope);
return;
}
if(scope > 0) {
strcat(result, "|");
}
}
}
// Define test cases
TEST_F(MndTestMnode, ScopeIsNull) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_NULL, result);
// Assert
EXPECT_STREQ(result, "NONE");
}
TEST_F(MndTestMnode, ScopeIsQuery) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_QUERY, result);
// Assert
EXPECT_STREQ(result, "QUERY");
}
TEST_F(MndTestMnode, ScopeIsInsert) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_INSERT, result);
// Assert
EXPECT_STREQ(result, "INSERT");
}
TEST_F(MndTestMnode, ScopeIsOthers) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_OTHERS, result);
// Assert
EXPECT_STREQ(result, "OTHERS");
}
TEST_F(MndTestMnode, ScopeIsMixed) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_OTHERS|SLOW_LOG_TYPE_INSERT, result);
// Assert
EXPECT_STREQ(result, "INSERT|OTHERS");
}
TEST_F(MndTestMnode, ScopeIsMixed1) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_ALL, result);
// Assert
EXPECT_STREQ(result, "QUERY|INSERT|OTHERS");
}
TEST_F(MndTestMnode, ScopeIsInvalid) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(0xF000, result);
// Assert
EXPECT_STREQ(result, ""); // Expect an empty string since the scope is invalid
// You may also want to check if the error message is correctly logged
}

View File

@ -363,7 +363,7 @@ typedef struct SCtgUserAuth {
} SCtgUserAuth;
typedef struct SCatalog {
uint64_t clusterId;
int64_t clusterId;
bool stopUpdate;
SDynViewVersion dynViewVer;
SHashObj* userCache; // key:user, value:SCtgUserAuth

View File

@ -834,7 +834,7 @@ int32_t catalogInit(SCatalogCfg* cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
int32_t catalogGetHandle(int64_t clusterId, SCatalog** catalogHandle) {
if (NULL == catalogHandle) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}

View File

@ -410,7 +410,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
return;
}
uint64_t clusterId = pCtg->clusterId;
int64_t clusterId = pCtg->clusterId;
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
@ -498,7 +498,7 @@ void ctgClearHandle(SCatalog* pCtg) {
return;
}
uint64_t clusterId = pCtg->clusterId;
int64_t clusterId = pCtg->clusterId;
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);

View File

@ -1,210 +0,0 @@
#include "clientMonitor.h"
#include "os.h"
#include "tmisce.h"
#include "ttime.h"
#include "ttimer.h"
#include "tglobal.h"
SRWLatch monitorLock;
void* tmrClientMonitor;
tmr_h tmrStartHandle;
SHashObj* clusterMonitorInfoTable;
static int interval = 30 * 1000;
static int sendBathchSize = 1;
int32_t sendReport(ClientMonitor* pMonitor, char* pCont);
void generateClusterReport(ClientMonitor* pMonitor, bool send) {
char ts[50];
sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
char* pCont = (char*)taos_collector_registry_bridge_new(pMonitor->registry, ts, "%" PRId64, NULL);
if(NULL == pCont) {
uError("generateClusterReport failed, get null content.");
return;
}
if (send && strlen(pCont) != 0) {
if (sendReport(pMonitor, pCont) == 0) {
taos_collector_registry_clear_batch(pMonitor->registry);
}
}
taosMemoryFreeClear(pCont);
}
void reportSendProcess(void* param, void* tmrId) {
taosTmrReset(reportSendProcess, tsMonitorInterval * 1000, NULL, tmrClientMonitor, &tmrStartHandle);
taosRLockLatch(&monitorLock);
static int index = 0;
index++;
ClientMonitor** ppMonitor = (ClientMonitor**)taosHashIterate(clusterMonitorInfoTable, NULL);
while (ppMonitor != NULL && *ppMonitor != NULL) {
ClientMonitor* pMonitor = *ppMonitor;
generateClusterReport(*ppMonitor, index == sendBathchSize);
ppMonitor = taosHashIterate(clusterMonitorInfoTable, ppMonitor);
}
if (index == sendBathchSize) index = 0;
taosRUnLockLatch(&monitorLock);
}
void monitorClientInitOnce() {
static int8_t init = 0;
if (atomic_exchange_8(&init, 1) == 0) {
uInfo("tscMonitorInit once.");
clusterMonitorInfoTable =
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
tmrClientMonitor = taosTmrInit(0, 0, 0, "MONITOR");
tmrStartHandle = taosTmrStart(reportSendProcess, tsMonitorInterval * 1000, NULL, tmrClientMonitor);
if(tsMonitorInterval < 1){
interval = 30 * 1000;
} else {
interval = tsMonitorInterval * 1000;
}
if (tsMonitorInterval < 10) {
sendBathchSize = (10 / sendBathchSize) + 1;
}
taosInitRWLatch(&monitorLock);
}
}
void createMonitorClient(const char* clusterKey, SEpSet epSet, void* pTransporter) {
if (clusterKey == NULL || strlen(clusterKey) == 0) {
uError("createMonitorClient failed, clusterKey is NULL");
return;
}
taosWLockLatch(&monitorLock);
if (taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)) == NULL) {
uInfo("createMonitorClient for %s.", clusterKey);
ClientMonitor* pMonitor = taosMemoryCalloc(1, sizeof(ClientMonitor));
snprintf(pMonitor->clusterKey, sizeof(pMonitor->clusterKey), "%s", clusterKey);
pMonitor->registry = taos_collector_registry_new(clusterKey);
pMonitor->colector = taos_collector_new(clusterKey);
epsetAssign(&pMonitor->epSet, &epSet);
pMonitor->pTransporter = pTransporter;
taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
pMonitor->counters =
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
taosHashPut(clusterMonitorInfoTable, clusterKey, strlen(clusterKey), &pMonitor, sizeof(ClientMonitor*));
uInfo("createMonitorClient for %s finished %p.", clusterKey, pMonitor);
}
taosWUnLockLatch(&monitorLock);
}
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
static int32_t emptyRspNum = 0;
if (TSDB_CODE_SUCCESS != code) {
uError("found error in monitorReport send callback, code:%d, please check the network.", code);
}
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
return code;
}
int32_t sendReport(ClientMonitor* pMonitor, char* pCont) {
SStatisReq sStatisReq;
sStatisReq.pCont = pCont;
sStatisReq.contLen = strlen(pCont);
int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq);
if (tlen < 0) return 0;
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSStatisReq(buf, tlen, &sStatisReq);
SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pInfo->fp = monitorReportAsyncCB;
pInfo->msgInfo.pData = buf;
pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_STATIS;
// pInfo->param = taosMemoryMalloc(sizeof(int32_t));
// *(int32_t*)pInfo->param = i;
pInfo->paramFreeFp = taosMemoryFree;
pInfo->requestId = tGenIdPI64();
pInfo->requestObjRefId = 0;
int64_t transporterId = 0;
return asyncSendMsgToServer(pMonitor->pTransporter, &pMonitor->epSet, &transporterId, pInfo);
}
void clusterMonitorInit(const char* clusterKey, SEpSet epSet, void* pTransporter) {
monitorClientInitOnce();
createMonitorClient(clusterKey, epSet, pTransporter);
}
taos_counter_t* createClusterCounter(const char* clusterKey, const char* name, const char* help, size_t label_key_count,
const char** label_keys) {
ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey));
if (ppMonitor != NULL && *ppMonitor != NULL) {
ClientMonitor* pMonitor = *ppMonitor;
taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, name, strlen(name));
if (ppCounter != NULL && *ppCounter != NULL) {
taosHashRemove(pMonitor->counters, name, strlen(name));
uInfo("createClusterCounter remove old counter: %s.", name);
}
taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
if (newCounter != NULL) {
taos_collector_add_metric(pMonitor->colector, newCounter);
taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, sizeof(taos_counter_t*));
uInfo("createClusterCounter %s(%p):%s : %p.", pMonitor->clusterKey, pMonitor, name, newCounter);
return newCounter;
} else {
return NULL;
}
} else {
return NULL;
}
return NULL;
}
int taosClusterCounterInc(const char* clusterKey, const char* counterName, const char** label_values) {
taosRLockLatch(&monitorLock);
ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey));
if (ppMonitor != NULL && *ppMonitor != NULL) {
ClientMonitor* pMonitor = *ppMonitor;
taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
if (ppCounter != NULL && *ppCounter != NULL) {
taos_counter_inc(*ppCounter, label_values);
} else {
uError("taosClusterCounterInc not found pCounter %s:%s.", clusterKey, counterName);
}
} else {
uError("taosClusterCounterInc not found pMonitor %s.", clusterKey);
}
taosRUnLockLatch(&monitorLock);
return 0;
}
void clusterMonitorClose(const char* clusterKey) {
taosWLockLatch(&monitorLock);
ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey));
if (ppMonitor != NULL && *ppMonitor != NULL) {
ClientMonitor* pMonitor = *ppMonitor;
uInfo("clusterMonitorClose valule:%p clusterKey:%s.", pMonitor, pMonitor->clusterKey);
taosHashCleanup(pMonitor->counters);
taos_collector_registry_destroy(pMonitor->registry);
taosMemoryFree(pMonitor);
taosHashRemove(clusterMonitorInfoTable, clusterKey, strlen(clusterKey));
}
taosWUnLockLatch(&monitorLock);
}
const char* resultStr(SQL_RESULT_CODE code) {
static const char* result_state[] = {"Success", "Failed", "Cancel"};
return result_state[code];
}

View File

@ -24,6 +24,7 @@
SMonitor tsMonitor = {0};
char* tsMonUri = "/report";
char* tsMonFwUri = "/general-metric";
char* tsMonSlowLogUri = "/slow-sql-detail-batch";
char* tsMonFwBasicUri = "/taosd-cluster-basic";
void monRecordLog(int64_t ts, ELogLevel level, const char *content) {
@ -631,7 +632,7 @@ void monGenAndSendReportBasic() {
monCleanupMonitorInfo(pMonitor);
}
void monSendContent(char *pCont) {
void monSendContent(char *pCont, const char* uri) {
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
if(tsMonitorLogProtocol){
if (pCont != NULL){
@ -640,7 +641,7 @@ void monSendContent(char *pCont) {
}
if (pCont != NULL) {
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
if (taosSendHttpReport(tsMonitor.cfg.server, uri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
uError("failed to send monitor msg");
}
}

View File

@ -7254,6 +7254,7 @@ static int32_t buildCmdMsg(STranslateContext* pCxt, int16_t msgType, FSerializeF
pCxt->pCmdMsg->msgLen = func(NULL, 0, pReq);
pCxt->pCmdMsg->pMsg = taosMemoryMalloc(pCxt->pCmdMsg->msgLen);
if (NULL == pCxt->pCmdMsg->pMsg) {
taosMemoryFreeClear(pCxt->pCmdMsg);
return TSDB_CODE_OUT_OF_MEMORY;
}
func(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, pReq);

View File

@ -78,7 +78,9 @@ void osDefaultInit() {
}
strcpy(tsDataDir, TD_DATA_DIR_PATH);
strcpy(tsLogDir, TD_LOG_DIR_PATH);
strcpy(tsTempDir, TD_TMP_DIR_PATH);
if(strlen(tsTempDir) == 0){
strcpy(tsTempDir, TD_TMP_DIR_PATH);
}
}
void osUpdate() {

View File

@ -66,7 +66,7 @@ typedef struct TdFile {
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath) {
#ifdef WINDOWS
const char *tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[PATH_MAX];
int32_t len = (int32_t)strlen(inputTmpDir);
@ -76,7 +76,7 @@ void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, cha
tmpPath[len++] = '\\';
}
strcpy(tmpPath + len, tdengineTmpFileNamePrefix);
strcpy(tmpPath + len, TD_TMP_FILE_PREFIX);
if (strlen(tmpPath) + strlen(fileNamePrefix) + strlen("-%d-%s") < PATH_MAX) {
strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%d-%s");
@ -88,8 +88,6 @@ void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, cha
#else
const char *tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[PATH_MAX];
int32_t len = strlen(inputTmpDir);
memcpy(tmpPath, inputTmpDir, len);
@ -99,7 +97,7 @@ void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, cha
tmpPath[len++] = '/';
}
strcpy(tmpPath + len, tdengineTmpFileNamePrefix);
strcpy(tmpPath + len, TD_TMP_FILE_PREFIX);
if (strlen(tmpPath) + strlen(fileNamePrefix) + strlen("-%d-%s") < PATH_MAX) {
strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%d-%s");

View File

@ -371,6 +371,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_ENCRYPTKEY, "invalid encryption ke
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ENCRYPTKEY_CHANGED, "encryption key was changed")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN, "Invalid encryption key length")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL,"statusInterval not match")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_MONITOR_PARAS, "monitor paras not match")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_TIMEZONE, "timezone not match")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_CHARSET, "charset not match")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_LOCALE, "locale not match")

View File

@ -70,7 +70,6 @@ class TDTestCase(TBase):
"smlTagName tagname",
"smlTsDefaultName tsdef",
"serverPort 6030",
"slowLogScope insert",
"timezone tz",
"tempDir /var/tmp"
]

View File

@ -230,7 +230,7 @@ endi
sql_error show create stable t0;
sql show variables;
if $rows != 4 then
if $rows != 9 then
return -1
endi

View File

@ -120,7 +120,7 @@ if $rows != 3 then
endi
sql show variables;
if $rows != 4 then
if $rows != 9 then
return -1
endi

View File

@ -45,7 +45,7 @@ class TDTestCase:
def case2(self):
tdSql.query("show variables")
tdSql.checkRows(4)
tdSql.checkRows(9)
for i in range(self.replicaVar):
tdSql.query("show dnode %d variables like 'debugFlag'" % (i + 1))