diff --git a/include/common/tglobal.h b/include/common/tglobal.h index e7035fe297..dd9589ccd4 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -24,10 +24,11 @@ extern "C" { #endif +#define SLOW_LOG_TYPE_NULL 0x0 #define SLOW_LOG_TYPE_QUERY 0x1 #define SLOW_LOG_TYPE_INSERT 0x2 #define SLOW_LOG_TYPE_OTHERS 0x4 -#define SLOW_LOG_TYPE_ALL 0xFFFFFFFF +#define SLOW_LOG_TYPE_ALL 0x7 typedef enum { DND_CA_SM4 = 1, @@ -177,7 +178,10 @@ extern int32_t tsMaxRetryWaitTime; extern bool tsUseAdapter; extern int32_t tsMetaCacheMaxSize; extern int32_t tsSlowLogThreshold; +extern int32_t tsSlowLogThresholdTest; +extern char tsSlowLogExceptDb[]; extern int32_t tsSlowLogScope; +extern int32_t tsSlowLogMaxLen; extern int32_t tsTimeSeriesThreshold; extern bool tsMultiResultFunctionStarReturnTags; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2aa9957ce8..a7eae9f19c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -654,6 +654,16 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp); #define SSCHMEA_BYTES(s) ((s)->bytes) #define SSCHMEA_NAME(s) ((s)->name) +typedef struct { + bool tsEnableMonitor; + int32_t tsMonitorInterval; + int32_t tsSlowLogThreshold; + int32_t tsSlowLogMaxLen; + int32_t tsSlowLogScope; + int32_t tsSlowLogThresholdTest; + char tsSlowLogExceptDb[TSDB_DB_NAME_LEN]; +} SMonitorParas; + typedef struct { int32_t nCols; int32_t version; @@ -968,6 +978,7 @@ typedef struct { char sVer[TSDB_VERSION_LEN]; char sDetailVer[128]; int64_t whiteListVer; + SMonitorParas monitorParas; } SConnectRsp; int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); @@ -1635,6 +1646,7 @@ typedef struct { int8_t enableWhiteList; int8_t encryptionKeyStat; uint32_t encryptionKeyChksum; + SMonitorParas monitorParas; } SClusterCfg; typedef struct { @@ -1726,9 +1738,15 @@ int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); void tFreeSStatusReq(SStatusReq* pReq); +typedef enum { + MONITOR_TYPE_COUNTER = 0, + MONITOR_TYPE_SLOW_LOG = 1, +} MONITOR_TYPE; + typedef struct { - int32_t contLen; - char* pCont; + int32_t contLen; + char* pCont; + MONITOR_TYPE type; } SStatisReq; int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq); @@ -3260,6 +3278,7 @@ typedef struct { int64_t rspId; int32_t svrTimestamp; SArray* rsps; // SArray + SMonitorParas monitorParas; } SClientHbBatchRsp; static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); } diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 90cc4ac157..3f1cf74cfa 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -206,7 +206,7 @@ int32_t catalogInit(SCatalogCfg* cfg); * @param catalogHandle (output, NO need to free it) * @return error code */ -int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle); +int32_t catalogGetHandle(int64_t clusterId, SCatalog** catalogHandle); int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum, int64_t* stateTs); diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 9c0302a15f..bdb77bab28 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -23,6 +23,7 @@ extern "C" { #include "taos_monitor.h" #include "thash.h" #include "query.h" +#include "tqueue.h" typedef enum SQL_RESULT_CODE { SQL_RESULT_SUCCESS = 0, @@ -30,23 +31,41 @@ typedef enum SQL_RESULT_CODE { SQL_RESULT_CANCEL = 2, } SQL_RESULT_CODE; -const char* resultStr(SQL_RESULT_CODE code); +#define SLOW_LOG_SEND_SIZE 1024*1024 +extern tsem2_t monitorSem; +extern STaosQueue* monitorQueue; typedef struct { - char clusterKey[512]; - SEpSet epSet; - void* pTransporter; + int64_t clusterId; taos_collector_registry_t* registry; taos_collector_t* colector; SHashObj* counters; -} ClientMonitor; + void* timer; +} MonitorClient; -void clusterMonitorInit(const char* clusterKey, SEpSet epSet, void* pTransporter); -void clusterMonitorClose(const char* clusterKey); -taos_counter_t* createClusterCounter(const char* clusterKey, const char* name, const char* help, size_t label_key_count, - const char** label_keys); -int taosClusterCounterInc(const char* clusterKey, const char* counterName, const char** label_values); -void cluster_monitor_stop(); +typedef struct { + TdFilePtr pFile; + void* timer; +} SlowLogClient; + +typedef struct { + int64_t clusterId; + char *value; +} MonitorSlowLogData; + +void monitorClose(); +void monitorInit(); +void monitorSendAllSlowLogFromTempDir(void* pInst); + +void monitorClientSQLReqInit(int64_t clusterKey); +void monitorClientSlowQueryInit(int64_t clusterId); +void monitorCreateClient(int64_t clusterId); +void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys); +void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values); +void* monitorThreadFunc(void *param); +void monitorFreeSlowLogData(MonitorSlowLogData* pData); +const char* monitorResultStr(SQL_RESULT_CODE code); +void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet); #ifdef __cplusplus } diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 256be26999..9d7878ecf7 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -227,7 +227,7 @@ void monSetSmInfo(SMonSmInfo *pInfo); void monSetBmInfo(SMonBmInfo *pInfo); void monGenAndSendReport(); void monGenAndSendReportBasic(); -void monSendContent(char *pCont); +void monSendContent(char *pCont, const char* uri); void tFreeSMonMmInfo(SMonMmInfo *pInfo); void tFreeSMonVmInfo(SMonVmInfo *pInfo); diff --git a/include/os/osFile.h b/include/os/osFile.h index 9c9027e931..4c56244278 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -72,6 +72,9 @@ TdFilePtr taosCreateFile(const char *path, int32_t tdFileOptions); #define TD_FILE_ACCESS_EXIST_OK 0x1 #define TD_FILE_ACCESS_READ_OK 0x2 #define TD_FILE_ACCESS_WRITE_OK 0x4 + +#define TD_TMP_FILE_PREFIX "tdengine-" + bool taosCheckAccessFile(const char *pathname, int mode); int32_t taosLockFile(TdFilePtr pFile); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2e407d26b0..a5688ca04b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -456,6 +456,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_DNODE_INVALID_LOCALE TAOS_DEF_ERROR_CODE(0, 0x0426) #define TSDB_CODE_DNODE_INVALID_TTL_CHG_ON_WR TAOS_DEF_ERROR_CODE(0, 0x0427) #define TSDB_CODE_DNODE_INVALID_EN_WHITELIST TAOS_DEF_ERROR_CODE(0, 0x0428) +#define TSDB_CODE_DNODE_INVALID_MONITOR_PARAS TAOS_DEF_ERROR_CODE(0, 0x0429) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) diff --git a/include/util/tdef.h b/include/util/tdef.h index 9e61ec8fe6..9c2858ed30 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -573,7 +573,7 @@ enum { #define TSDB_CONFIG_OPTION_LEN 32 #define TSDB_CONFIG_VALUE_LEN 64 #define TSDB_CONFIG_SCOPE_LEN 8 -#define TSDB_CONFIG_NUMBER 8 +#define TSDB_CONFIG_NUMBER 16 #define QUERY_ID_SIZE 20 #define QUERY_OBJ_ID_SIZE 18 diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 2fbf4eeabb..7a84215e12 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -115,10 +115,11 @@ struct SAppInstInfo { SArray* pQnodeList; SAppClusterSummary summary; SList* pConnList; // STscObj linked list - uint64_t clusterId; + int64_t clusterId; void* pTransporter; SAppHbMgr* pAppHbMgr; char* instKey; + SMonitorParas monitorParas; }; typedef struct SAppInfo { @@ -127,6 +128,7 @@ typedef struct SAppInfo { int32_t pid; int32_t numOfThreads; SHashObj* pInstMap; + SHashObj* pInstMapByClusterId; TdThreadMutex mutex; } SAppInfo; @@ -350,7 +352,7 @@ void* createTscObj(const char* user, const char* auth, const char* db, int32_ void destroyTscObj(void* pObj); STscObj* acquireTscObj(int64_t rid); int32_t releaseTscObj(int64_t rid); -void destroyAppInst(SAppInstInfo* pAppInfo); +void destroyAppInst(void* pAppInfo); uint64_t generateRequestId(); @@ -403,7 +405,7 @@ void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr); void destroyAllRequests(SHashObj* pRequests); void stopAllRequests(SHashObj* pRequests); -SAppInstInfo* getAppInstInfo(const char* clusterKey); +//SAppInstInfo* getAppInstInfo(const char* clusterKey); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); @@ -441,10 +443,8 @@ void freeQueryParam(SSyncQueryParam* param); int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser, SParseSqlRes* pRes); #endif -void clientSlowQueryMonitorInit(const char* clusterKey); -void SlowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost); -void clientSQLReqMonitorInit(const char* clusterKey); +void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost); enum { MONITORSQLTYPESELECT = 0, @@ -454,8 +454,6 @@ enum { void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type); -void clientMonitorClose(const char* clusterKey); - #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 23b9649c6b..15568669f1 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -13,9 +13,12 @@ * along with this program. If not, see . */ +#include +#include "cJSON.h" #include "catalog.h" #include "clientInt.h" #include "clientLog.h" +#include "clientMonitor.h" #include "functionMgt.h" #include "os.h" #include "osSleep.h" @@ -26,6 +29,7 @@ #include "tglobal.h" #include "thttp.h" #include "tmsg.h" +#include "tqueue.h" #include "tref.h" #include "trpc.h" #include "tsched.h" @@ -70,6 +74,125 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) { return TSDB_CODE_SUCCESS; } +static void concatStrings(SArray *list, char* buf, int size){ + int len = 0; + for(int i = 0; i < taosArrayGetSize(list); i++){ + char* db = taosArrayGet(list, i); + char* dot = strchr(db, '.'); + if (dot != NULL) { + db = dot + 1; + } + if (i != 0){ + strcat(buf, ","); + len += 1; + } + int ret = snprintf(buf + len, size - len, "%s", db); + if (ret < 0) { + tscError("snprintf failed, buf:%s, ret:%d", buf, ret); + break; + } + len += ret; + if (len >= size){ + tscInfo("dbList is truncated, buf:%s, len:%d", buf, len); + break; + } + } +} + +static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration){ + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + tscError("[monitor] cJSON_CreateObject failed"); + return; + } + char clusterId[32] = {0}; + if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0){ + uError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId); + } + + char startTs[32] = {0}; + if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start/1000) < 0){ + uError("failed to generate startTs:%" PRId64, pRequest->metric.start/1000); + } + + char requestId[32] = {0}; + if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0){ + uError("failed to generate requestId:%" PRIu64, pRequest->requestId); + } + cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)); + cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)); + cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)); + cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration/1000)); + cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)); + cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))); + cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)); + cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.totalRows)); + if(strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){ + char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen]; + pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0'; + cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)); + pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = tmp; + }else{ + cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)); + } + + cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)); + cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)); + cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)); + + char pid[32] = {0}; + if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0){ + uError("failed to generate pid:%d", appInfo.pid); + } + + cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)); + if(pRequest->dbList != NULL){ + char dbList[1024] = {0}; + concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1); + cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)); + }else if(pRequest->pDb != NULL){ + cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)); + }else{ + cJSON_AddItemToObject(json, "db", cJSON_CreateString("unknown")); + } + + + MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0); + if (slowLogData == NULL) { + cJSON_Delete(json); + tscError("[monitor] failed to allocate slow log data"); + return; + } + slowLogData->clusterId = pTscObj->pAppInfo->clusterId; + slowLogData->value = cJSON_PrintUnformatted(json); + tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); + if (taosWriteQitem(monitorQueue, slowLogData) == 0){ + tsem2_post(&monitorSem); + }else{ + monitorFreeSlowLogData(slowLogData); + taosFreeQitem(slowLogData); + } + cJSON_Delete(json); +} + +static bool checkSlowLogExceptDb(SRequestObj *pRequest, char* exceptDb) { + if (pRequest->pDb != NULL) { + return strcmp(pRequest->pDb, exceptDb) != 0; + } + + for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) { + char *db = taosArrayGet(pRequest->dbList, i); + char *dot = strchr(db, '.'); + if (dot != NULL) { + db = dot + 1; + } + if(strcmp(db, exceptDb) == 0){ + return false; + } + } + return true; +} + static void deregisterRequest(SRequestObj *pRequest) { if (pRequest == NULL) { tscError("pRequest == NULL"); @@ -113,21 +236,27 @@ static void deregisterRequest(SRequestObj *pRequest) { nodesSimReleaseAllocator(pRequest->allocatorRefId); } - if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) { - sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT); - } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { - sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT); - } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) { - sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE); + if(pTscObj->pAppInfo->monitorParas.tsEnableMonitor){ + if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) { + sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT); + } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { + sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT); + } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) { + sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE); + } } - if (duration >= (tsSlowLogThreshold * 1000000UL)) { + if ((duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThreshold * 1000000UL || duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThresholdTest * 1000000UL) && + checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->monitorParas.tsSlowLogExceptDb)) { atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); - if (tsSlowLogScope & reqType) { - taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s", + if (pTscObj->pAppInfo->monitorParas.tsSlowLogScope & reqType) { + taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 " us, Duration:%" PRId64 "us, SQL:%s", taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr); - SlowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration); + if(pTscObj->pAppInfo->monitorParas.tsEnableMonitor){ + slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration); + generateWriteSlowLog(pTscObj, pRequest, reqType, duration); + } } } @@ -233,14 +362,13 @@ void stopAllRequests(SHashObj *pRequests) { } } -void destroyAppInst(SAppInstInfo *pAppInfo) { +void destroyAppInst(void *info) { + SAppInstInfo* pAppInfo = *(SAppInstInfo**)info; tscDebug("destroy app inst mgr %p", pAppInfo); taosThreadMutexLock(&appInfo.mutex); - clientMonitorClose(pAppInfo->instKey); hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr); - taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey)); taosThreadMutexUnlock(&appInfo.mutex); @@ -477,13 +605,11 @@ void doDestroyRequest(void *p) { destorySqlCallbackWrapper(pRequest->pWrapper); taosMemoryFreeClear(pRequest->msgBuf); - taosMemoryFreeClear(pRequest->pDb); doFreeReqResultInfo(&pRequest->body.resInfo); tsem_destroy(&pRequest->body.rspSem); taosArrayDestroy(pRequest->tableList); - taosArrayDestroy(pRequest->dbList); taosArrayDestroy(pRequest->targetTableList); destroyQueryExecRes(&pRequest->body.resInfo.execRes); @@ -492,6 +618,8 @@ void doDestroyRequest(void *p) { deregisterRequest(pRequest); } + taosMemoryFreeClear(pRequest->pDb); + taosArrayDestroy(pRequest->dbList); if (pRequest->body.interParam) { tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem); } @@ -670,7 +798,7 @@ void tscStopCrashReport() { } if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) { - tscDebug("hb thread already stopped"); + tscDebug("crash report thread already stopped"); return; } @@ -719,7 +847,8 @@ void taos_init_imp(void) { appInfo.pid = taosGetPId(); appInfo.startTime = taosGetTimestampMs(); appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - + appInfo.pInstMapByClusterId = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst); deltaToUtcInitOnce(); char logDirName[64] = {0}; @@ -769,6 +898,7 @@ void taos_init_imp(void) { taosThreadMutexInit(&appInfo.mutex, NULL); tscCrashReportInit(); + monitorInit(); tscDebug("client is initialized successfully"); } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 483edcf3f0..19b6655af1 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -18,6 +18,7 @@ #include "clientLog.h" #include "scheduler.h" #include "trpc.h" +#include "tglobal.h" typedef struct { union { @@ -67,7 +68,7 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC } static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) { - uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; + int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) { SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) { @@ -545,6 +546,9 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { } SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo; + pInst->monitorParas = pRsp.monitorParas; + tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", + pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); if (code != 0) { pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1; @@ -1129,7 +1133,7 @@ int32_t hbGatherAppInfo(void) { SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (pAppHbMgr == NULL) continue; - uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; + int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); if (NULL == pApp) { memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary)); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 1057a00725..11d3797157 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -147,7 +147,7 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas } p->pAppHbMgr = appHbMgrInit(p, key); if (NULL == p->pAppHbMgr) { - destroyAppInst(p); + destroyAppInst(&p); taosThreadMutexUnlock(&appInfo.mutex); taosMemoryFreeClear(key); return NULL; @@ -158,9 +158,6 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port); pInst = &p; - - clientSlowQueryMonitorInit(p->instKey); - clientSQLReqMonitorInit(p->instKey); } else { ASSERTS((*pInst) && (*pInst)->pAppHbMgr, "*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL); // reset to 0 in case of conn with duplicated user key but its user has ever been dropped. @@ -174,14 +171,14 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType); } -SAppInstInfo* getAppInstInfo(const char* clusterKey) { - SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey)); - if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) { - return *ppAppInstInfo; - } else { - return NULL; - } -} +//SAppInstInfo* getAppInstInfo(const char* clusterKey) { +// SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey)); +// if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) { +// return *ppAppInstInfo; +// } else { +// return NULL; +// } +//} void freeQueryParam(SSyncQueryParam* param) { if (param == NULL) return; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9cbdbfbd25..9108cae14a 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -17,6 +17,7 @@ #include "clientInt.h" #include "clientLog.h" #include "clientStmt.h" +#include "clientMonitor.h" #include "functionMgt.h" #include "os.h" #include "query.h" @@ -55,6 +56,9 @@ void taos_cleanup(void) { return; } + monitorClose(); + taosHashCleanup(appInfo.pInstMap); + taosHashCleanup(appInfo.pInstMapByClusterId); tscStopCrashReport(); hbMgrCleanUp(); @@ -279,7 +283,6 @@ void taos_close_internal(void *taos) { STscObj *pTscObj = (STscObj *)taos; tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); - // clientMonitorClose(pTscObj->pAppInfo->instKey); taosRemoveRef(clientConnRefPool, pTscObj->id); } diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c new file mode 100644 index 0000000000..e66884e74e --- /dev/null +++ b/source/client/src/clientMonitor.c @@ -0,0 +1,652 @@ +#include "clientMonitor.h" +#include "os.h" +#include "tmisce.h" +#include "ttime.h" +#include "ttimer.h" +#include "tglobal.h" +#include "tqueue.h" +#include "cJSON.h" +#include "clientInt.h" + +SRWLatch monitorLock; +void* monitorTimer; +SHashObj* monitorCounterHash; +int32_t slowLogFlag = -1; +int32_t monitorFlag = -1; +tsem2_t monitorSem; +STaosQueue* monitorQueue; +SHashObj* monitorSlowLogHash; + +static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){ + if (tsTempDir == NULL) { + return -1; + } + int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); + if (ret < 0){ + uError("failed to get tmp path ret:%d", ret); + return ret; + } + return 0; +} + +//static void destroyCounter(void* data){ +// if (data == NULL) { +// return; +// } +// taos_counter_t* conuter = *(taos_counter_t**)data; +// if(conuter == NULL){ +// return; +// } +// taos_counter_destroy(conuter); +//} + +static void destroySlowLogClient(void* data){ + if (data == NULL) { + return; + } + SlowLogClient* slowLogClient = *(SlowLogClient**)data; + if(slowLogClient == NULL){ + return; + } + taosTmrStopA(&(*(SlowLogClient**)data)->timer); + + TdFilePtr pFile = slowLogClient->pFile; + if(pFile == NULL){ + taosMemoryFree(slowLogClient); + return; + } + + taosUnLockFile(pFile); + taosCloseFile(&pFile); + taosMemoryFree(slowLogClient); +} + +static void destroyMonitorClient(void* data){ + if (data == NULL) { + return; + } + MonitorClient* pMonitor = *(MonitorClient**)data; + if(pMonitor == NULL){ + return; + } + taosTmrStopA(&pMonitor->timer); + taosHashCleanup(pMonitor->counters); + taos_collector_registry_destroy(pMonitor->registry); +// taos_collector_destroy(pMonitor->colector); + taosMemoryFree(pMonitor); +} + +static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) { + void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES); + if(p == NULL){ + uError("failed to get app inst, clusterId:%" PRIx64, clusterId); + return NULL; + } + return *(SAppInstInfo**)p; +} + +static int32_t tscMonitortInit() { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + TdThread monitorThread; + if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { + uError("failed to create monitor thread since %s", strerror(errno)); + return -1; + } + + taosThreadAttrDestroy(&thAttr); + return 0; +} + +static void tscMonitorStop() { + if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) { + uDebug("monitor thread already stopped"); + return; + } + + while (atomic_load_32(&slowLogFlag) > 0) { + taosMsleep(100); + } +} + +static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { + if (TSDB_CODE_SUCCESS != code) { + uError("found error in monitorReport send callback, code:%d, please check the network.", code); + } + if (pMsg) { + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + } + return code; +} + +static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITOR_TYPE type) { + SStatisReq sStatisReq; + sStatisReq.pCont = pCont; + sStatisReq.contLen = strlen(pCont); + sStatisReq.type = type; + + int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq); + if (tlen < 0) return 0; + void* buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + uError("sendReport failed, out of memory, len:%d", tlen); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + tSerializeSStatisReq(buf, tlen, &sStatisReq); + + SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (pInfo == NULL) { + uError("sendReport failed, out of memory send info"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pInfo->fp = monitorReportAsyncCB; + pInfo->msgInfo.pData = buf; + pInfo->msgInfo.len = tlen; + pInfo->msgType = TDMT_MND_STATIS; + // pInfo->param = taosMemoryMalloc(sizeof(int32_t)); + // *(int32_t*)pInfo->param = i; + pInfo->paramFreeFp = taosMemoryFree; + pInfo->requestId = tGenIdPI64(); + pInfo->requestObjRefId = 0; + + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); + if (code != TSDB_CODE_SUCCESS) { + uError("sendReport failed, code:%d", code); + } + return code; +} + +void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){ + char buf[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log + char pCont[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log + int32_t offset = 0; + if(taosLSeekFile(pFile, 0, SEEK_SET) < 0){ + uError("failed to seek file:%p code: %d", pFile, errno); + return; + } + while(1){ + int64_t readSize = taosReadFile(pFile, buf + offset, SLOW_LOG_SEND_SIZE - offset); + if (readSize <= 0) { + uError("failed to read len from file:%p since %s", pFile, terrstr()); + return; + } + + memset(pCont, 0, sizeof(pCont)); + strcat(pCont, "["); + char* string = buf; + for(int i = 0; i < readSize + offset; i++){ + if (buf[i] == '\0') { + if (string != buf) strcat(pCont, ","); + strcat(pCont, string); + uDebug("[monitor] monitorReadSendSlowLog slow log:%s", string); + string = buf + i + 1; + } + } + strcat(pCont, "]"); + if (pTransporter && pCont != NULL) { + if(sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_SLOW_LOG) != 0){ + if(taosLSeekFile(pFile, -readSize, SEEK_CUR) < 0){ + uError("failed to seek file:%p code: %d", pFile, errno); + } + uError("failed to send report:%s", pCont); + return; + } + uDebug("[monitor] monitorReadSendSlowLog send slow log to mnode:%s", pCont) + } + + if (readSize + offset < SLOW_LOG_SEND_SIZE) { + break; + } + offset = SLOW_LOG_SEND_SIZE - (string - buf); + if(buf != string && offset != 0){ + memmove(buf, string, offset); + uDebug("[monitor] monitorReadSendSlowLog left slow log:%s", buf) + } + } + if(taosFtruncateFile(pFile, 0) < 0){ + uError("failed to truncate file:%p code: %d", pFile, errno); + } + uDebug("[monitor] monitorReadSendSlowLog send slow log file:%p", pFile); +} + +static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet *epSet) { + char ts[50] = {0}; + sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); + char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL); + if(NULL == pCont) { + uError("generateClusterReport failed, get null content."); + return; + } + + if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER) == 0) { + taos_collector_registry_clear_batch(registry); + } + taosMemoryFreeClear(pCont); +} + +static void reportSendProcess(void* param, void* tmrId) { + taosRLockLatch(&monitorLock); + if (atomic_load_32(&monitorFlag) == 1) { + taosRUnLockLatch(&monitorLock); + return; + } + + MonitorClient* pMonitor = (MonitorClient*)param; + SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); + if(pInst == NULL){ + taosRUnLockLatch(&monitorLock); + return; + } + + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); + taosRUnLockLatch(&monitorLock); + taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); +} + +static void sendAllSlowLog(){ + void* data = taosHashIterate(monitorSlowLogHash, NULL); + while (data != NULL) { + TdFilePtr pFile = (*(SlowLogClient**)data)->pFile; + if (pFile != NULL){ + int64_t clusterId = *(int64_t*)taosHashGetKey(data, NULL); + SAppInstInfo* pInst = getAppInstByClusterId(clusterId); + if(pInst == NULL){ + taosHashCancelIterate(monitorSlowLogHash, data); + break; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep); + } + data = taosHashIterate(monitorSlowLogHash, data); + } + uDebug("[monitor] sendAllSlowLog when client close"); +} + +void monitorSendAllSlowLogFromTempDir(void* inst){ + SAppInstInfo* pInst = (SAppInstInfo*)inst; + if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){ + uInfo("[monitor] monitor is disabled, skip send slow log"); + return; + } + char namePrefix[PATH_MAX] = {0}; + if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) { + uError("failed to generate slow log file name prefix"); + return; + } + + taosRLockLatch(&monitorLock); + + char tmpPath[PATH_MAX] = {0}; + if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) { + goto END; + } + + TdDirPtr pDir = taosOpenDir(tmpPath); + if (pDir == NULL) { + goto END; + } + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + if (taosDirEntryIsDir(de)) { + continue; + } + + char *name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || + strcmp(name, "..") == 0 || + strstr(name, namePrefix) == NULL) { + uInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId); + continue; + } + + char filename[PATH_MAX] = {0}; + snprintf(filename, sizeof(filename), "%s%s", tmpPath, name); + TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ); + if (pFile == NULL) { + uError("failed to open file:%s since %s", filename, terrstr()); + continue; + } + if (taosLockFile(pFile) < 0) { + uError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr()); + taosCloseFile(&pFile); + continue; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep); + taosUnLockFile(pFile); + taosCloseFile(&pFile); + taosRemoveFile(filename); + uDebug("[monitor] send and delete slow log file when reveive connect rsp:%s", filename); + + } + + taosCloseDir(&pDir); + +END: + taosRUnLockLatch(&monitorLock); +} + +static void sendAllCounter(){ + MonitorClient** ppMonitor = (MonitorClient**)taosHashIterate(monitorCounterHash, NULL); + while (ppMonitor != NULL) { + MonitorClient* pMonitor = *ppMonitor; + if (pMonitor != NULL){ + SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); + if(pInst == NULL){ + taosHashCancelIterate(monitorCounterHash, ppMonitor); + break; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); + } + ppMonitor = taosHashIterate(monitorCounterHash, ppMonitor); + } +} + +void monitorInit() { + uInfo("[monitor] tscMonitor init"); + monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (monitorCounterHash == NULL) { + uError("failed to create monitorCounterHash"); + } + taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); + + monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (monitorSlowLogHash == NULL) { + uError("failed to create monitorSlowLogHash"); + } + taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient); + + monitorTimer = taosTmrInit(0, 0, 0, "MONITOR"); + if (monitorTimer == NULL) { + uError("failed to create monitor timer"); + } + + taosInitRWLatch(&monitorLock); + tscMonitortInit(); +} + +void monitorClose() { + uInfo("[monitor] tscMonitor close"); + taosRLockLatch(&monitorLock); + + if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) { + uDebug("[monitor] monitorFlag is not 0"); + } + tscMonitorStop(); + sendAllSlowLog(); + sendAllCounter(); + taosHashCleanup(monitorCounterHash); + taosHashCleanup(monitorSlowLogHash); + taosTmrCleanUp(monitorTimer); + taosRUnLockLatch(&monitorLock); +} + +void monitorCreateClient(int64_t clusterId) { + MonitorClient* pMonitor = NULL; + taosWLockLatch(&monitorLock); + if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) { + uInfo("[monitor] monitorCreateClient for %" PRIx64, clusterId); + pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient)); + if (pMonitor == NULL) { + uError("failed to create monitor client"); + goto fail; + } + pMonitor->clusterId = clusterId; + char clusterKey[32] = {0}; + if(snprintf(clusterKey, sizeof(clusterKey), "%"PRId64, clusterId) < 0){ + uError("failed to create cluster key"); + goto fail; + } + pMonitor->registry = taos_collector_registry_new(clusterKey); + if(pMonitor->registry == NULL){ + uError("failed to create registry"); + goto fail; + } + pMonitor->colector = taos_collector_new(clusterKey); + if(pMonitor->colector == NULL){ + uError("failed to create collector"); + goto fail; + } + + taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector); + pMonitor->counters = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (pMonitor->counters == NULL) { + uError("failed to create monitor counters"); + goto fail; + } +// taosHashSetFreeFp(pMonitor->counters, destroyCounter); + + if(taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0){ + uError("failed to put monitor client to hash"); + goto fail; + } + + SAppInstInfo* pInst = getAppInstByClusterId(clusterId); + if(pInst == NULL){ + uError("failed to get app instance by cluster id"); + pMonitor = NULL; + goto fail; + } + pMonitor->timer = taosTmrStart(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer); + if(pMonitor->timer == NULL){ + uError("failed to start timer"); + goto fail; + } + uInfo("[monitor] monitorCreateClient for %"PRIx64 "finished %p.", clusterId, pMonitor); + } + taosWUnLockLatch(&monitorLock); + if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) { + uDebug("[monitor] monitorFlag already is 0"); + } + return; + +fail: + destroyMonitorClient(&pMonitor); + taosWUnLockLatch(&monitorLock); +} + +void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys) { + taosWLockLatch(&monitorLock); + MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES); + if (ppMonitor == NULL || *ppMonitor == NULL) { + uError("failed to get monitor client"); + goto end; + } + taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); + if (newCounter == NULL) + return; + MonitorClient* pMonitor = *ppMonitor; + taos_collector_add_metric(pMonitor->colector, newCounter); + if(taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0){ + uError("failed to put counter to monitor"); + taos_counter_destroy(newCounter); + goto end; + } + uInfo("[monitor] monitorCreateClientCounter %"PRIx64"(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter); + +end: + taosWUnLockLatch(&monitorLock); +} + +void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) { + taosRLockLatch(&monitorLock); + MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES); + if (ppMonitor == NULL || *ppMonitor == NULL) { + uError("monitorCounterInc not found pMonitor %"PRId64, clusterId); + goto end; + } + + MonitorClient* pMonitor = *ppMonitor; + taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName)); + if (ppCounter == NULL || *ppCounter != NULL) { + uError("monitorCounterInc not found pCounter %"PRIx64":%s.", clusterId, counterName); + goto end; + } + taos_counter_inc(*ppCounter, label_values); + uInfo("[monitor] monitorCounterInc %"PRIx64"(%p):%s", pMonitor->clusterId, pMonitor, counterName); + +end: + taosRUnLockLatch(&monitorLock); +} + +const char* monitorResultStr(SQL_RESULT_CODE code) { + static const char* result_state[] = {"Success", "Failed", "Cancel"}; + return result_state[code]; +} + +void monitorFreeSlowLogData(MonitorSlowLogData* pData) { + if (pData == NULL) { + return; + } + taosMemoryFree(pData->value); +} + +void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); } + +void reportSlowLog(void* param, void* tmrId) { + taosRLockLatch(&monitorLock); + if (atomic_load_32(&monitorFlag) == 1) { + taosRUnLockLatch(&monitorLock); + return; + } + SAppInstInfo* pInst = getAppInstByClusterId((int64_t)param); + if(pInst == NULL){ + uError("failed to get app inst, clusterId:%"PRIx64, (int64_t)param); + taosRUnLockLatch(&monitorLock); + return; + } + + void* tmp = taosHashGet(monitorSlowLogHash, ¶m, LONG_BYTES); + if(tmp == NULL){ + uError("failed to get file inst, clusterId:%"PRIx64, (int64_t)param); + taosRUnLockLatch(&monitorLock); + return; + } + + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + monitorReadSendSlowLog((*(SlowLogClient**)tmp)->pFile, pInst->pTransporter, &ep); + taosRUnLockLatch(&monitorLock); + + taosTmrReset(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); +} + +void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){ + taosRLockLatch(&monitorLock); + TdFilePtr pFile = NULL; + void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); + if (tmp == NULL){ + char path[PATH_MAX] = {0}; + char clusterId[32] = {0}; + if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0){ + uError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId); + goto FAILED; + } + taosGetTmpfilePath(tmpPath, clusterId, path); + uInfo("[monitor] create slow log file:%s", path); + pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to open file:%s since %s", path, terrstr()); + goto FAILED; + } + + SlowLogClient *pClient = taosMemoryCalloc(1, sizeof(SlowLogClient)); + if (pClient == NULL){ + uError("failed to allocate memory for slow log client"); + taosCloseFile(&pFile); + goto FAILED; + } + pClient->pFile = pFile; + if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0){ + uError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); + taosCloseFile(&pFile); + taosMemoryFree(pClient); + goto FAILED; + } + + if(taosLockFile(pFile) < 0){ + uError("failed to lock file:%p since %s", pFile, terrstr()); + goto FAILED; + } + + SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); + if(pInst == NULL){ + uError("failed to get app instance by clusterId:%" PRId64, slowLogData->clusterId); + goto FAILED; + } + + pClient->timer = taosTmrStart(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, (void*)slowLogData->clusterId, monitorTimer); + }else{ + pFile = (*(SlowLogClient**)tmp)->pFile; + } + + if (taosWriteFile(pFile, slowLogData->value, strlen(slowLogData->value) + 1) < 0){ + uError("failed to write len to file:%p since %s", pFile, terrstr()); + } + uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId); + +FAILED: + taosRUnLockLatch(&monitorLock); +} + +void* monitorThreadFunc(void *param){ + setThreadName("client-monitor-slowlog"); + +#ifdef WINDOWS + if (taosCheckCurrentInDll()) { + atexit(monitorThreadFuncUnexpectedStopped); + } +#endif + + if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) { + return NULL; + } + + char tmpPath[PATH_MAX] = {0}; + if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0){ + return NULL; + } + + if (taosMulModeMkDir(tmpPath, 0777, true) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + printf("failed to create dir:%s since %s", tmpPath, terrstr()); + return NULL; + } + + if (tsem2_init(&monitorSem, 0, 0) != 0) { + uError("sem init error since %s", terrstr()); + return NULL; + } + + monitorQueue = taosOpenQueue(); + if(monitorQueue == NULL){ + uError("open queue error since %s", terrstr()); + return NULL; + } + while (1) { + if (slowLogFlag > 0) break; + + MonitorSlowLogData* slowLogData = NULL; + taosReadQitem(monitorQueue, (void**)&slowLogData); + if (slowLogData != NULL) { + uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); + monitorWriteSlowLog2File(slowLogData, tmpPath); + } + monitorFreeSlowLogData(slowLogData); + taosFreeQitem(slowLogData); + tsem2_timewait(&monitorSem, 500); + } + + taosCloseQueue(monitorQueue); + tsem2_destroy(&monitorSem); + slowLogFlag = -2; + return NULL; +} \ No newline at end of file diff --git a/source/client/src/slowQueryMonitor.c b/source/client/src/clientMonitorSlow.c similarity index 64% rename from source/client/src/slowQueryMonitor.c rename to source/client/src/clientMonitorSlow.c index b41343443d..192792f43e 100644 --- a/source/client/src/slowQueryMonitor.c +++ b/source/client/src/clientMonitorSlow.c @@ -21,7 +21,6 @@ const char* slowQueryName = "taos_slow_sql:count"; const char* slowQueryHelp = "slow query log when cost over than config duration"; const int slowQueryLabelCount = 4; const char* slowQueryLabels[] = {"cluster_id", "username", "result", "duration"}; -static const char* defaultClusterID = ""; const int64_t usInSeconds = 1000 * 1000; const int64_t msInMinutes = 60 * 1000; @@ -39,21 +38,21 @@ static const char* getSlowQueryLableCostDesc(int64_t cost) { return "0-3s"; } -void clientSlowQueryMonitorInit(const char* clusterKey) { - if (!tsEnableMonitor) return; - SAppInstInfo* pAppInstInfo = getAppInstInfo(clusterKey); - SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); - clusterMonitorInit(clusterKey, epSet, pAppInstInfo->pTransporter); - createClusterCounter(clusterKey, slowQueryName, slowQueryHelp, slowQueryLabelCount, slowQueryLabels); +void monitorClientSlowQueryInit(int64_t clusterid) { + monitorCreateClient(clusterid); + monitorCreateClientCounter(clusterid, slowQueryName, slowQueryHelp, slowQueryLabelCount, slowQueryLabels); } -void clientSlowQueryLog(const char* clusterKey, const char* user, SQL_RESULT_CODE result, int32_t cost) { - const char* slowQueryLabelValues[] = {defaultClusterID, user, resultStr(result), getSlowQueryLableCostDesc(cost)}; - taosClusterCounterInc(clusterKey, slowQueryName, slowQueryLabelValues); +void clientSlowQueryLog(int64_t clusterId, const char* user, SQL_RESULT_CODE result, int32_t cost) { + char clusterIdStr[32] = {0}; + if (snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId) < 0){ + uError("failed to generate clusterId:%" PRId64, clusterId); + } + const char* slowQueryLabelValues[] = {clusterIdStr, user, monitorResultStr(result), getSlowQueryLableCostDesc(cost)}; + monitorCounterInc(clusterId, slowQueryName, slowQueryLabelValues); } -void SlowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) { - if (!tsEnableMonitor) return; +void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) { SQL_RESULT_CODE result = SQL_RESULT_SUCCESS; if (TSDB_CODE_SUCCESS != code) { result = SQL_RESULT_FAILED; @@ -66,12 +65,12 @@ void SlowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) { STscObj* pTscObj = acquireTscObj(rid); if (pTscObj != NULL) { if(pTscObj->pAppInfo == NULL) { - tscLog("SlowQueryLog, not found pAppInfo"); + tscLog("slowQueryLog, not found pAppInfo"); } else { - clientSlowQueryLog(pTscObj->pAppInfo->instKey, pTscObj->user, result, cost); + clientSlowQueryLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, cost); } releaseTscObj(rid); } else { - tscLog("SlowQueryLog, not found rid"); + tscLog("slowQueryLog, not found rid"); } } diff --git a/source/client/src/clientSqlMonitor.c b/source/client/src/clientMonitorSql.c similarity index 64% rename from source/client/src/clientSqlMonitor.c rename to source/client/src/clientMonitorSql.c index 572af7ff55..19d5b7506e 100644 --- a/source/client/src/clientSqlMonitor.c +++ b/source/client/src/clientMonitorSql.c @@ -22,17 +22,12 @@ const char* selectMonitorHelp = "count for select sql"; const int selectMonitorLabelCount = 4; const char* selectMonitorLabels[] = {"cluster_id", "sql_type", "username", "result"}; -static const char* defaultClusterID = ""; - -void clientSQLReqMonitorInit(const char* clusterKey) { - if (!tsEnableMonitor) return; - SAppInstInfo* pAppInstInfo = getAppInstInfo(clusterKey); - SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); - clusterMonitorInit(clusterKey, epSet, pAppInstInfo->pTransporter); - createClusterCounter(clusterKey, selectMonitorName, selectMonitorHelp, selectMonitorLabelCount, selectMonitorLabels); +void monitorClientSQLReqInit(int64_t clusterId) { + monitorCreateClient(clusterId); + monitorCreateClientCounter(clusterId, selectMonitorName, selectMonitorHelp, selectMonitorLabelCount, selectMonitorLabels); } -void clientSQLReqLog(const char* clusterKey, const char* user, SQL_RESULT_CODE result, int8_t type) { +void clientSQLReqLog(int64_t clusterId, const char* user, SQL_RESULT_CODE result, int8_t type) { const char* typeStr; switch (type) { case MONITORSQLTYPEDELETE: @@ -45,12 +40,15 @@ void clientSQLReqLog(const char* clusterKey, const char* user, SQL_RESULT_CODE r typeStr = "select"; break; } - const char* selectMonitorLabelValues[] = {defaultClusterID, typeStr, user, resultStr(result)}; - taosClusterCounterInc(clusterKey, selectMonitorName, selectMonitorLabelValues); + char clusterIdStr[32] = {0}; + if (snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId) < 0){ + uError("failed to generate clusterId:%" PRId64, clusterId); + } + const char* selectMonitorLabelValues[] = {clusterIdStr, typeStr, user, monitorResultStr(result)}; + monitorCounterInc(clusterId, selectMonitorName, selectMonitorLabelValues); } void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type) { - if (!tsEnableMonitor) return; SQL_RESULT_CODE result = SQL_RESULT_SUCCESS; if (TSDB_CODE_SUCCESS != code) { result = SQL_RESULT_FAILED; @@ -65,15 +63,10 @@ void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type) { if (pTscObj->pAppInfo == NULL) { tscLog("sqlReqLog, not found pAppInfo"); } else { - clientSQLReqLog(pTscObj->pAppInfo->instKey, pTscObj->user, result, type); + clientSQLReqLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, type); } releaseTscObj(rid); } else { tscLog("sqlReqLog, not found rid"); } } - -void clientMonitorClose(const char* clusterKey) { - tscLog("clientMonitorClose, key:%s", clusterKey); - clusterMonitorClose(clusterKey); -} diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index f04e5e53c6..8c917a7534 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -15,6 +15,7 @@ #include "catalog.h" #include "clientInt.h" +#include "clientMonitor.h" #include "clientLog.h" #include "cmdnodes.h" #include "os.h" @@ -140,6 +141,9 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { // update the appInstInfo pTscObj->pAppInfo->clusterId = connectRsp.clusterId; + pTscObj->pAppInfo->monitorParas = connectRsp.monitorParas; + tscDebug("[monitor] paras from connect rsp, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", + connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope); lastClusterId = connectRsp.clusterId; pTscObj->connType = connectRsp.connType; @@ -147,6 +151,15 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { pTscObj->authVer = connectRsp.authVer; pTscObj->whiteListInfo.ver = connectRsp.whiteListVer; + if(taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL){ + if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){ + tscError("failed to put appInfo into appInfo.pInstMapByClusterId"); + } + monitorSendAllSlowLogFromTempDir(pTscObj->pAppInfo); + monitorClientSlowQueryInit(connectRsp.clusterId); + monitorClientSQLReqInit(connectRsp.clusterId); + } + taosThreadMutexLock(&clientHbMgr.lock); SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx); if (pAppHbMgr) { @@ -233,7 +246,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { struct SCatalog* pCatalog = NULL; if (usedbRsp.vgVersion >= 0) { // cached in local - uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId; + int64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId; int32_t code1 = catalogGetHandle(clusterId, &pCatalog); if (code1 != TSDB_CODE_SUCCESS) { tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 6eb7abe0eb..a771fc1635 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1851,7 +1851,7 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL } void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) { - if (tsSlowLogScope & SLOW_LOG_TYPE_INSERT) { + if (request->pTscObj->pAppInfo->monitorParas.tsSlowLogScope & SLOW_LOG_TYPE_INSERT) { int32_t len = 0; int32_t rlen = 0; char *p = NULL; diff --git a/source/client/test/clientMonitorTests.cpp b/source/client/test/clientMonitorTests.cpp index f7ddc1a5cd..c74f4f7290 100644 --- a/source/client/test/clientMonitorTests.cpp +++ b/source/client/test/clientMonitorTests.cpp @@ -35,34 +35,34 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } -TEST(clientMonitorTest, monitorTest) { - const char* cluster1 = "cluster1"; - const char* cluster2 = "cluster2"; - SEpSet epSet; - clusterMonitorInit(cluster1, epSet, NULL); - const char* counterName1 = "slow_query"; - const char* counterName2 = "select_count"; - const char* help1 = "test for slowQuery"; - const char* help2 = "test for selectSQL"; - const char* lables[] = {"lable1"}; - taos_counter_t* c1 = createClusterCounter(cluster1, counterName1, help1, 1, lables); - ASSERT_TRUE(c1 != NULL); - taos_counter_t* c2 = createClusterCounter(cluster1, counterName2, help2, 1, lables); - ASSERT_TRUE(c2 != NULL); - ASSERT_TRUE(c1 != c2); - taos_counter_t* c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables); - ASSERT_TRUE(c21 == NULL); - clusterMonitorInit(cluster2, epSet, NULL); - c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables); - ASSERT_TRUE(c21 != NULL); - int i = 0; - while (i < 12) { - taosMsleep(10); - ++i; - } - clusterMonitorClose(cluster1); - clusterMonitorClose(cluster2); -} +//TEST(clientMonitorTest, monitorTest) { +// const char* cluster1 = "cluster1"; +// const char* cluster2 = "cluster2"; +// SEpSet epSet; +// clientMonitorInit(cluster1, epSet, NULL); +// const char* counterName1 = "slow_query"; +// const char* counterName2 = "select_count"; +// const char* help1 = "test for slowQuery"; +// const char* help2 = "test for selectSQL"; +// const char* lables[] = {"lable1"}; +// taos_counter_t* c1 = createClusterCounter(cluster1, counterName1, help1, 1, lables); +// ASSERT_TRUE(c1 != NULL); +// taos_counter_t* c2 = createClusterCounter(cluster1, counterName2, help2, 1, lables); +// ASSERT_TRUE(c2 != NULL); +// ASSERT_TRUE(c1 != c2); +// taos_counter_t* c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables); +// ASSERT_TRUE(c21 == NULL); +// clientMonitorInit(cluster2, epSet, NULL); +// c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables); +// ASSERT_TRUE(c21 != NULL); +// int i = 0; +// while (i < 12) { +// taosMsleep(10); +// ++i; +// } +// clusterMonitorClose(cluster1); +// clusterMonitorClose(cluster2); +//} TEST(clientMonitorTest, sendTest) { TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); @@ -70,13 +70,100 @@ TEST(clientMonitorTest, sendTest) { printf("connect taosd sucessfully.\n"); int64_t rid = *(int64_t *)taos; - SlowQueryLog(rid, false, -1, 1000); + slowQueryLog(rid, false, -1, 1000); int i = 0; while (i < 20) { - SlowQueryLog(rid, false, 0, i * 1000); + slowQueryLog(rid, false, 0, i * 1000); taosMsleep(10); ++i; } taos_close(taos); } + +TEST(clientMonitorTest, ReadOneFile) { + // Create a TdFilePtr object and set it up for testing + + TdFilePtr pFile = taosOpenFile("./tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); + if (pFile == NULL) { + uError("failed to open file:./test.txt since %s", terrstr()); + return; + } + + const int batch = 10; + const int size = SLOW_LOG_SEND_SIZE/batch; + for(int i = 0; i < batch + 1; i++){ + char value[size] = {0}; + memset(value, '0' + i, size - 1); + if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ + uError("failed to write len to file:%p since %s", pFile, terrstr()); + } + } + + // Create a void pointer and set it up for testing + void* pTransporter = NULL; + + // Create an SEpSet object and set it up for testing + SEpSet* epSet = NULL; + + // Call the function to be tested + monitorReadSendSlowLog(pFile, pTransporter, epSet); + + char value[size] = {0}; + memset(value, '0', size - 1); + if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ + uError("failed to write len to file:%p since %s", pFile, terrstr()); + } + + monitorReadSendSlowLog(pFile, pTransporter, epSet); + + // Clean up any resources created for testing + taosCloseFile(&pFile); +} + +TEST(clientMonitorTest, ReadTwoFile) { + // Create a TdFilePtr object and set it up for testing + + TdFilePtr pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); + if (pFile == NULL) { + uError("failed to open file:./test.txt since %s", terrstr()); + return; + } + + const int batch = 10; + const int size = SLOW_LOG_SEND_SIZE/batch; + for(int i = 0; i < batch + 1; i++){ + char value[size] = {0}; + memset(value, '0' + i, size - 1); + if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ + uError("failed to write len to file:%p since %s", pFile, terrstr()); + } + } + + taosFsyncFile(pFile); + taosCloseFile(&pFile); + + pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-2-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); + if (pFile == NULL) { + uError("failed to open file:./test.txt since %s", terrstr()); + return; + } + + for(int i = 0; i < batch + 1; i++){ + char value[size] = {0}; + memset(value, '0' + i, size - 1); + if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ + uError("failed to write len to file:%p since %s", pFile, terrstr()); + } + } + + taosFsyncFile(pFile); + taosCloseFile(&pFile); + + SAppInstInfo pAppInfo = {0}; + pAppInfo.clusterId = 2; + pAppInfo.monitorParas.tsEnableMonitor = 1; + strcpy(tsTempDir,"/tmp"); + monitorSendAllSlowLogFromTempDir(&pAppInfo); + +} \ No newline at end of file diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index bbd759b3d1..611891e298 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1520,6 +1520,13 @@ TEST(clientCase, sub_tb_test) { } TEST(clientCase, sub_tb_mt_test) { + char *user = NULL; + char *auth = NULL; + char *ip = NULL; + int port = 0; + char key[512] = {0}; + snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port); + taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); TdThread qid[20] = {0}; @@ -1532,4 +1539,38 @@ TEST(clientCase, sub_tb_mt_test) { } } +//static void concatStrings(SArray *list, char* buf, int size){ +// int len = 0; +// for(int i = 0; i < taosArrayGetSize(list); i++){ +// char* db = (char*)taosArrayGet(list, i); +// char* dot = strchr(db, '.'); +// if (dot != NULL) { +// db = dot + 1; +// } +// if (i != 0){ +// strcat(buf, ","); +// len += 1; +// } +// int ret = snprintf(buf + len, size - len, "%s", db); +// if (ret < 0) { +// printf("snprintf failed, buf:%s, ret:%d", buf, ret); +// break; +// } +// len += ret; +// if (len >= size){ +// printf("dbList is truncated, buf:%s, len:%d", buf, len); +// break; +// } +// } +//} +// +//TEST(clientCase, concat_string_test) { +// SArray* list = taosArrayInit(10, TSDB_DB_FNAME_LEN); +// taosArrayPush(list, "1.db1"); +// taosArrayPush(list, "2.db2"); +// +// char buf[32] = {0}; +// concatStrings(list, buf, sizeof(buf) - 1); +//} + #pragma GCC diagnostic pop diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a79ee03ab1..cba852b48f 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -179,8 +179,12 @@ int32_t tsRedirectMaxPeriod = 1000; int32_t tsMaxRetryWaitTime = 10000; bool tsUseAdapter = false; int32_t tsMetaCacheMaxSize = -1; // MB -int32_t tsSlowLogThreshold = 3; // seconds -int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL; +int32_t tsSlowLogThreshold = 10; // seconds +int32_t tsSlowLogThresholdTest = 10; // seconds +char tsSlowLogExceptDb[TSDB_DB_NAME_LEN] = ""; // seconds +int32_t tsSlowLogScope = SLOW_LOG_TYPE_QUERY; +char* tsSlowLogScopeString = "query"; +int32_t tsSlowLogMaxLen = 4096; int32_t tsTimeSeriesThreshold = 50; bool tsMultiResultFunctionStarReturnTags = false; @@ -543,9 +547,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { return -1; if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; - if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) - return -1; - if (cfgAddString(pCfg, "slowLogScope", "", CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); @@ -570,9 +571,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { return -1; if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1; - if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; - if (cfgAddBool(pCfg, "multiResultFunctionStarReturnTags", tsMultiResultFunctionStarReturnTags, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; @@ -702,6 +700,15 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "skipGrant", tsMndSkipGrant, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 86400, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + + if (cfgAddInt32(pCfg, "slowLogThresholdTest", tsSlowLogThresholdTest, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 1, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "slowLogMaxLen", tsSlowLogMaxLen, 1, 16384, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "slowLogScope", tsSlowLogScopeString, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "slowLogExceptDb", tsSlowLogExceptDb, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -966,40 +973,52 @@ static void taosSetServerLogCfg(SConfig *pCfg) { sndDebugFlag = cfgGetItem(pCfg, "sndDebugFlag")->i32; } -static int32_t taosSetSlowLogScope(const char *pScope) { +static int32_t taosSetSlowLogScope(char *pScope) { if (NULL == pScope || 0 == strlen(pScope)) { - tsSlowLogScope = SLOW_LOG_TYPE_ALL; - return 0; + return SLOW_LOG_TYPE_QUERY; } - if (0 == strcasecmp(pScope, "all")) { - tsSlowLogScope = SLOW_LOG_TYPE_ALL; - return 0; + int32_t slowScope = 0; + + char* scope = NULL; + char *tmp = NULL; + while((scope = strsep(&pScope, "|")) != NULL){ + taosMemoryFreeClear(tmp); + tmp = strdup(scope); + strtrim(tmp); + if (0 == strcasecmp(tmp, "all")) { + slowScope |= SLOW_LOG_TYPE_ALL; + continue; + } + + if (0 == strcasecmp(tmp, "query")) { + slowScope |= SLOW_LOG_TYPE_QUERY; + continue; + } + + if (0 == strcasecmp(tmp, "insert")) { + slowScope |= SLOW_LOG_TYPE_INSERT; + continue; + } + + if (0 == strcasecmp(tmp, "others")) { + slowScope |= SLOW_LOG_TYPE_OTHERS; + continue; + } + + if (0 == strcasecmp(tmp, "none")) { + slowScope |= SLOW_LOG_TYPE_NULL; + continue; + } + + taosMemoryFreeClear(tmp); + uError("Invalid slowLog scope value:%s", pScope); + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; } - if (0 == strcasecmp(pScope, "query")) { - tsSlowLogScope = SLOW_LOG_TYPE_QUERY; - return 0; - } - - if (0 == strcasecmp(pScope, "insert")) { - tsSlowLogScope = SLOW_LOG_TYPE_INSERT; - return 0; - } - - if (0 == strcasecmp(pScope, "others")) { - tsSlowLogScope = SLOW_LOG_TYPE_OTHERS; - return 0; - } - - if (0 == strcasecmp(pScope, "none")) { - tsSlowLogScope = 0; - return 0; - } - - uError("Invalid slowLog scope value:%s", pScope); - terrno = TSDB_CODE_INVALID_CFG_VALUE; - return -1; + taosMemoryFreeClear(tmp); + return slowScope; } // for common configs @@ -1057,12 +1076,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64; tsMetaCacheMaxSize = cfgGetItem(pCfg, "metaCacheMaxSize")->i32; - tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32; - tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; - tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; - if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) { - return -1; - } tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; @@ -1136,6 +1149,16 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsSIMDEnable = (bool)cfgGetItem(pCfg, "simdEnable")->bval; tsTagFilterCache = (bool)cfgGetItem(pCfg, "tagFilterCache")->bval; + tstrncpy(tsSlowLogExceptDb, cfgGetItem(pCfg, "slowLogExceptDb")->str, TSDB_DB_NAME_LEN); + tsSlowLogThresholdTest = cfgGetItem(pCfg, "slowLogThresholdTest")->i32; + tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32; + tsSlowLogMaxLen = cfgGetItem(pCfg, "slowLogMaxLen")->i32; + int32_t scope = taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str); + if(scope < 0){ + return -1; + } + tsSlowLogScope = scope; + tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN); @@ -1423,8 +1446,7 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, char *name = pItem->name; for (int32_t d = 0; d < optionSize; ++d) { const char *optName = pOptions[d].optionName; - int32_t optLen = strlen(optName); - if (strncasecmp(name, optName, optLen) != 0) continue; + if (strcasecmp(name, optName) != 0) continue; switch (pItem->dtype) { case CFG_DTYPE_BOOL: { int32_t flag = pItem->i32; @@ -1497,6 +1519,23 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { return 0; } + if (strcasecmp("slowLogScope", name) == 0) { + int32_t scope = taosSetSlowLogScope(pItem->str); + if(scope < 0){ + cfgUnLock(pCfg); + return -1; + } + tsSlowLogScope = scope; + cfgUnLock(pCfg); + return 0; + } + + if (strcasecmp("slowLogExceptDb", name) == 0) { + tstrncpy(tsSlowLogExceptDb, pItem->str, TSDB_DB_NAME_LEN); + cfgUnLock(pCfg); + return 0; + } + { // 'bool/int32_t/int64_t/float/double' variables with general modification function static OptionNameAndVar debugOptions[] = { {"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag}, {"mDebugFlag", &mDebugFlag}, @@ -1514,6 +1553,10 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"enableWhiteList", &tsEnableWhiteList}, {"telemetryReporting", &tsEnableTelem}, {"monitor", &tsEnableMonitor}, + {"monitorInterval", &tsMonitorInterval}, + {"slowLogThreshold", &tsSlowLogThreshold}, + {"slowLogThresholdTest", &tsSlowLogThresholdTest}, + {"slowLogMaxLen", &tsSlowLogMaxLen}, {"mndSdbWriteDelta", &tsMndSdbWriteDelta}, {"minDiskFreeSize", &tsMinDiskFreeSize}, @@ -1656,10 +1699,6 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { tsLogSpace.reserved = (int64_t)(((double)pItem->fval) * 1024 * 1024 * 1024); uInfo("%s set to %" PRId64, name, tsLogSpace.reserved); matched = true; - } else if (strcasecmp("monitor", name) == 0) { - tsEnableMonitor = pItem->bval; - uInfo("%s set to %d", name, tsEnableMonitor); - matched = true; } break; } @@ -1703,13 +1742,6 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype, false); uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst); matched = true; - } else if (strcasecmp("slowLogScope", name) == 0) { - if (taosSetSlowLogScope(pItem->str)) { - cfgUnLock(pCfg); - return -1; - } - uInfo("%s set to %s", name, pItem->str); - matched = true; } break; } @@ -1770,7 +1802,6 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { {"queryUseNodeAllocator", &tsQueryUseNodeAllocator}, {"smlDot2Underline", &tsSmlDot2Underline}, {"shellActivityTimer", &tsShellActivityTimer}, - {"slowLogThreshold", &tsSlowLogThreshold}, {"useAdapter", &tsUseAdapter}, {"experimental", &tsExperimental}, {"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags}, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index fa1ff00757..65ac34c390 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -69,6 +69,28 @@ pReq->sql = NULL; \ } while (0) +static int32_t tSerializeSMonitorParas(SEncoder *encoder, const SMonitorParas* pMonitorParas) { + if (tEncodeI8(encoder, pMonitorParas->tsEnableMonitor) < 0) return -1; + if (tEncodeI32(encoder, pMonitorParas->tsMonitorInterval) < 0) return -1; + if (tEncodeI32(encoder, pMonitorParas->tsSlowLogScope) < 0) return -1; + if (tEncodeI32(encoder, pMonitorParas->tsSlowLogMaxLen) < 0) return -1; + if (tEncodeI32(encoder, pMonitorParas->tsSlowLogThreshold) < 0) return -1; + if (tEncodeI32(encoder, pMonitorParas->tsSlowLogThresholdTest) < 0) return -1; + if (tEncodeCStr(encoder, pMonitorParas->tsSlowLogExceptDb) < 0) return -1; + return 0; +} + +static int32_t tDeserializeSMonitorParas(SDecoder *decoder, SMonitorParas* pMonitorParas){ + if (tDecodeI8(decoder, (int8_t *)&pMonitorParas->tsEnableMonitor) < 0) return -1; + if (tDecodeI32(decoder, &pMonitorParas->tsMonitorInterval) < 0) return -1; + if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogScope) < 0) return -1; + if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogMaxLen) < 0) return -1; + if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogThreshold) < 0) return -1; + if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogThresholdTest) < 0) return -1; + if (tDecodeCStrTo(decoder, pMonitorParas->tsSlowLogExceptDb) < 0) return -1; + return 0; +} + static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq); static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq); static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoRsp *pRsp); @@ -519,6 +541,7 @@ int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBa SClientHbRsp *pRsp = taosArrayGet(pBatchRsp->rsps, i); if (tSerializeSClientHbRsp(&encoder, pRsp) < 0) return -1; } + if (tSerializeSMonitorParas(&encoder, &pBatchRsp->monitorParas) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -546,6 +569,10 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR taosArrayPush(pBatchRsp->rsps, &rsp); } + if (!tDecodeIsEnd(&decoder)) { + if (tDeserializeSMonitorParas(&decoder, &pBatchRsp->monitorParas) < 0) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -1308,6 +1335,9 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { } if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) return -1; + + if (tSerializeSMonitorParas(&encoder, &pReq->clusterCfg.monitorParas) < 0) return -1; + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1426,6 +1456,10 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1; } + if (!tDecodeIsEnd(&decoder)) { + if (tDeserializeSMonitorParas(&decoder, &pReq->clusterCfg.monitorParas) < 0) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -1521,6 +1555,7 @@ int32_t tSerializeSStatisReq(void *buf, int32_t bufLen, SStatisReq *pReq) { if (tEncodeI32(&encoder, pReq->contLen) < 0) return -1; if (tEncodeCStr(&encoder, pReq->pCont) < 0) return -1; + if (tEncodeI8(&encoder, pReq->type) < 0) return -1; tEndEncode(&encoder); @@ -1541,7 +1576,9 @@ int32_t tDeserializeSStatisReq(void *buf, int32_t bufLen, SStatisReq *pReq) { if (pReq->pCont == NULL) return -1; if (tDecodeCStrTo(&decoder, pReq->pCont) < 0) return -1; } - + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, (int8_t*)&pReq->type) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -5153,6 +5190,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1; if (tEncodeI32(&encoder, pRsp->authVer) < 0) return -1; if (tEncodeI64(&encoder, pRsp->whiteListVer) < 0) return -1; + if (tSerializeSMonitorParas(&encoder, &pRsp->monitorParas) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -5194,6 +5232,9 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { } else { pRsp->whiteListVer = 0; } + if (!tDecodeIsEnd(&decoder)) { + if (tDeserializeSMonitorParas(&decoder, &pRsp->monitorParas) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index d30b26e564..197ccdb6ca 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -726,4 +726,118 @@ TEST(AlreadyAddGroupIdTest, GroupIdAddedWithDifferentLength) { EXPECT_FALSE(result); } +#define SLOW_LOG_TYPE_NULL 0x0 +#define SLOW_LOG_TYPE_QUERY 0x1 +#define SLOW_LOG_TYPE_INSERT 0x2 +#define SLOW_LOG_TYPE_OTHERS 0x4 +#define SLOW_LOG_TYPE_ALL 0x7 + +static int32_t taosSetSlowLogScope(char *pScope) { + if (NULL == pScope || 0 == strlen(pScope)) { + return SLOW_LOG_TYPE_QUERY; + } + + int32_t slowScope = 0; + + char* scope = NULL; + char *tmp = NULL; + while((scope = strsep(&pScope, "|")) != NULL){ + taosMemoryFreeClear(tmp); + tmp = strdup(scope); + strtrim(tmp); + if (0 == strcasecmp(tmp, "all")) { + slowScope |= SLOW_LOG_TYPE_ALL; + continue; + } + + if (0 == strcasecmp(tmp, "query")) { + slowScope |= SLOW_LOG_TYPE_QUERY; + continue; + } + + if (0 == strcasecmp(tmp, "insert")) { + slowScope |= SLOW_LOG_TYPE_INSERT; + continue; + } + + if (0 == strcasecmp(tmp, "others")) { + slowScope |= SLOW_LOG_TYPE_OTHERS; + continue; + } + + if (0 == strcasecmp(tmp, "none")) { + slowScope |= SLOW_LOG_TYPE_NULL; + continue; + } + + taosMemoryFreeClear(tmp); + uError("Invalid slowLog scope value:%s", pScope); + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + + taosMemoryFreeClear(tmp); + return slowScope; +} + +TEST(TaosSetSlowLogScopeTest, NullPointerInput) { + char *pScope = NULL; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_QUERY); +} + +TEST(TaosSetSlowLogScopeTest, EmptyStringInput) { + char pScope[1] = ""; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_QUERY); +} + +TEST(TaosSetSlowLogScopeTest, AllScopeInput) { + char pScope[] = "all"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_ALL); +} + +TEST(TaosSetSlowLogScopeTest, QueryScopeInput) { + char pScope[] = " query"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_QUERY); +} + +TEST(TaosSetSlowLogScopeTest, InsertScopeInput) { + char pScope[] = "insert"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_INSERT); +} + +TEST(TaosSetSlowLogScopeTest, OthersScopeInput) { + char pScope[] = "others"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_OTHERS); +} + +TEST(TaosSetSlowLogScopeTest, NoneScopeInput) { + char pScope[] = "none"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_NULL); +} + +TEST(TaosSetSlowLogScopeTest, InvalidScopeInput) { + char pScope[] = "invalid"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, -1); +} + +TEST(TaosSetSlowLogScopeTest, MixedScopesInput) { + char pScope[] = "query|insert|others|none"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, (SLOW_LOG_TYPE_QUERY | SLOW_LOG_TYPE_INSERT | SLOW_LOG_TYPE_OTHERS)); +} + +TEST(TaosSetSlowLogScopeTest, MixedScopesInputWithSpaces) { + char pScope[] = "query | insert | others "; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, (SLOW_LOG_TYPE_QUERY | SLOW_LOG_TYPE_INSERT | SLOW_LOG_TYPE_OTHERS)); +} + #pragma GCC diagnostic pop diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 800f7f7864..3e1a633752 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -116,6 +116,13 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0; req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat; req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum; + req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor; + req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval; + req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope; + req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen; + req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold; + req.clusterCfg.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest; + tstrncpy(req.clusterCfg.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN); char timestr[32] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 46e606d3ea..dd577f8908 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -143,6 +143,7 @@ typedef enum { DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH, DND_REASON_ENABLE_WHITELIST_NOT_MATCH, DND_REASON_ENCRYPTION_KEY_NOT_MATCH, + DND_REASON_STATUS_MONITOR_NOT_MATCH, DND_REASON_OTHERS } EDndReason; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 8847158c11..f5ab56c1f2 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -438,7 +438,27 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) { } } +#define CHECK_MONITOR_PARA(para) \ +if (pCfg->monitorParas.para != para) { \ + mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \ + terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS; \ + return DND_REASON_STATUS_MONITOR_NOT_MATCH;\ +} + static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) { + CHECK_MONITOR_PARA(tsEnableMonitor); + CHECK_MONITOR_PARA(tsMonitorInterval); + CHECK_MONITOR_PARA(tsSlowLogThreshold); + CHECK_MONITOR_PARA(tsSlowLogThresholdTest); + CHECK_MONITOR_PARA(tsSlowLogMaxLen); + CHECK_MONITOR_PARA(tsSlowLogScope); + + if (0 != strcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) { + mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id, pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb); + terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS; + return DND_REASON_STATUS_MONITOR_NOT_MATCH; + } + if (pCfg->statusInterval != tsStatusInterval) { mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval, tsStatusInterval); @@ -530,6 +550,8 @@ static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) { return stateChanged; } +extern char* tsMonFwUri; +extern char* tsMonSlowLogUri; static int32_t mndProcessStatisReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStatisReq statisReq = {0}; @@ -547,188 +569,14 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { mInfo("process statis req,\n %s", statisReq.pCont); } - SJson *pJson = tjsonParse(statisReq.pCont); - - int32_t ts_size = tjsonGetArraySize(pJson); - - for (int32_t i = 0; i < ts_size; i++) { - SJson *item = tjsonGetArrayItem(pJson, i); - - SJson *tables = tjsonGetObjectItem(item, "tables"); - - int32_t tableSize = tjsonGetArraySize(tables); - for (int32_t i = 0; i < tableSize; i++) { - SJson *table = tjsonGetArrayItem(tables, i); - - char tableName[MONITOR_TABLENAME_LEN] = {0}; - tjsonGetStringValue(table, "name", tableName); - - SJson *metricGroups = tjsonGetObjectItem(table, "metric_groups"); - - int32_t size = tjsonGetArraySize(metricGroups); - for (int32_t i = 0; i < size; i++) { - SJson *item = tjsonGetArrayItem(metricGroups, i); - - SJson *arrayTag = tjsonGetObjectItem(item, "tags"); - - int32_t tagSize = tjsonGetArraySize(arrayTag); - for (int32_t j = 0; j < tagSize; j++) { - SJson *item = tjsonGetArrayItem(arrayTag, j); - - char tagName[MONITOR_TAG_NAME_LEN] = {0}; - tjsonGetStringValue(item, "name", tagName); - - if (strncmp(tagName, "cluster_id", MONITOR_TAG_NAME_LEN) == 0) { - tjsonDeleteItemFromObject(item, "value"); - tjsonAddStringToObject(item, "value", strClusterId); - } - } - } - } - } - - char *pCont = tjsonToString(pJson); - monSendContent(pCont); - - if (pJson != NULL) { - tjsonDelete(pJson); - pJson = NULL; - } - - if (pCont != NULL) { - taosMemoryFree(pCont); - pCont = NULL; + if (statisReq.type == MONITOR_TYPE_COUNTER){ + monSendContent(statisReq.pCont, tsMonFwUri); + }else if(statisReq.type == MONITOR_TYPE_SLOW_LOG){ + monSendContent(statisReq.pCont, tsMonSlowLogUri); } tFreeSStatisReq(&statisReq); return 0; - - /* - SJson* pJson = tjsonParse(statisReq.pCont); - - int32_t ts_size = tjsonGetArraySize(pJson); - - for(int32_t i = 0; i < ts_size; i++){ - SJson* item = tjsonGetArrayItem(pJson, i); - - SJson* tables = tjsonGetObjectItem(item, "tables"); - - int32_t tableSize = tjsonGetArraySize(tables); - for(int32_t i = 0; i < tableSize; i++){ - SJson* table = tjsonGetArrayItem(tables, i); - - char tableName[MONITOR_TABLENAME_LEN] = {0}; - tjsonGetStringValue(table, "name", tableName); - - SJson* metricGroups = tjsonGetObjectItem(table, "metric_groups"); - - int32_t size = tjsonGetArraySize(metricGroups); - for(int32_t i = 0; i < size; i++){ - SJson* item = tjsonGetArrayItem(metricGroups, i); - - SJson* arrayTag = tjsonGetObjectItem(item, "tags"); - - int32_t tagSize = tjsonGetArraySize(arrayTag); - - char** labels = taosMemoryMalloc(sizeof(char*) * tagSize); - char** sample_labels = taosMemoryMalloc(sizeof(char*) * tagSize); - - for(int32_t j = 0; j < tagSize; j++){ - SJson* item = tjsonGetArrayItem(arrayTag, j); - - *(labels + j) = taosMemoryMalloc(MONITOR_TAG_NAME_LEN); - tjsonGetStringValue(item, "name", *(labels + j)); - - *(sample_labels + j) = taosMemoryMalloc(MONITOR_TAG_VALUE_LEN); - tjsonGetStringValue(item, "value", *(sample_labels + j)); - if(strncmp(*(labels + j), "cluster_id", MONITOR_TAG_NAME_LEN) == 0) { - strncpy(*(sample_labels + j), strClusterId, MONITOR_TAG_VALUE_LEN); - } - } - - SJson* metrics = tjsonGetObjectItem(item, "metrics"); - - int32_t metricLen = tjsonGetArraySize(metrics); - for(int32_t j = 0; j < metricLen; j++){ - SJson *item = tjsonGetArrayItem(metrics, j); - - char name[MONITOR_METRIC_NAME_LEN] = {0}; - tjsonGetStringValue(item, "name", name); - - double value = 0; - tjsonGetDoubleValue(item, "value", &value); - - double type = 0; - tjsonGetDoubleValue(item, "type", &type); - - int32_t metricNameLen = strlen(name) + strlen(tableName) + 2; - char* metricName = taosMemoryMalloc(metricNameLen); - memset(metricName, 0, metricNameLen); - sprintf(metricName, "%s:%s", tableName, name); - - taos_metric_t* metric = taos_collector_registry_get_metric(metricName); - if(metric == NULL){ - if(type == 0){ - metric = taos_counter_new(metricName, "", tagSize, (const char**)labels); - } - if(type == 1){ - metric = taos_gauge_new(metricName, "", tagSize, (const char**)labels); - } - mTrace("fail to get metric from registry, new one metric:%p", metric); - - if(taos_collector_registry_register_metric(metric) == 1){ - if(type == 0){ - taos_counter_destroy(metric); - } - if(type == 1){ - taos_gauge_destroy(metric); - } - - metric = taos_collector_registry_get_metric(metricName); - - mTrace("fail to register metric, get metric from registry:%p", metric); - } - else{ - mTrace("succeed to register metric:%p", metric); - } - } - else{ - mTrace("get metric from registry:%p", metric); - } - - if(type == 0){ - taos_counter_add(metric, value, (const char**)sample_labels); - } - if(type == 1){ - taos_gauge_set(metric, value, (const char**)sample_labels); - } - - taosMemoryFreeClear(metricName); - } - - for(int32_t j = 0; j < tagSize; j++){ - taosMemoryFreeClear(*(labels + j)); - taosMemoryFreeClear(*(sample_labels + j)); - } - - taosMemoryFreeClear(sample_labels); - taosMemoryFreeClear(labels); - } - } - - } - - code = 0; - - _OVER: - if(pJson != NULL){ - tjsonDelete(pJson); - pJson = NULL; - } - - tFreeSStatisReq(&statisReq); - return code; - */ } static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) { @@ -1092,6 +940,32 @@ _OVER: return code; } +static void getSlowLogScopeString(int32_t scope, char* result){ + if(scope == SLOW_LOG_TYPE_NULL) { + strcat(result, "NONE"); + return; + } + while(scope > 0){ + if(scope & SLOW_LOG_TYPE_QUERY) { + strcat(result, "QUERY"); + scope &= ~SLOW_LOG_TYPE_QUERY; + } else if(scope & SLOW_LOG_TYPE_INSERT) { + strcat(result, "INSERT"); + scope &= ~SLOW_LOG_TYPE_INSERT; + } else if(scope & SLOW_LOG_TYPE_OTHERS) { + strcat(result, "OTHERS"); + scope &= ~SLOW_LOG_TYPE_OTHERS; + } else{ + printf("invalid slow log scope:%d", scope); + return; + } + + if(scope > 0) { + strcat(result, "|"); + } + } +} + static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) { SShowVariablesRsp rsp = {0}; int32_t code = -1; @@ -1100,7 +974,7 @@ static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) { goto _OVER; } - rsp.variables = taosArrayInit(4, sizeof(SVariablesInfo)); + rsp.variables = taosArrayInit(16, sizeof(SVariablesInfo)); if (NULL == rsp.variables) { mError("failed to alloc SVariablesInfo array while process show variables req"); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1129,6 +1003,33 @@ static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) { strcpy(info.scope, "both"); taosArrayPush(rsp.variables, &info); + strcpy(info.name, "monitor"); + snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor); + strcpy(info.scope, "server"); + taosArrayPush(rsp.variables, &info); + + strcpy(info.name, "monitorInterval"); + snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval); + strcpy(info.scope, "server"); + taosArrayPush(rsp.variables, &info); + + strcpy(info.name, "slowLogThreshold"); + snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold); + strcpy(info.scope, "server"); + taosArrayPush(rsp.variables, &info); + + strcpy(info.name, "slowLogMaxLen"); + snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen); + strcpy(info.scope, "server"); + taosArrayPush(rsp.variables, &info); + + char scopeStr[64] = {0}; + getSlowLogScopeString(tsSlowLogScope, scopeStr); + strcpy(info.name, "slowLogScope"); + snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", scopeStr); + strcpy(info.scope, "server"); + taosArrayPush(rsp.variables, &info); + int32_t rspLen = tSerializeSShowVariablesRsp(NULL, 0, &rsp); void *pRsp = rpcMallocCont(rspLen); if (pRsp == NULL) { @@ -1679,6 +1580,28 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset); totalRows++; + cfgOpts[totalRows] = "monitor"; + snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor); + totalRows++; + + cfgOpts[totalRows] = "monitorInterval"; + snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval); + totalRows++; + + cfgOpts[totalRows] = "slowLogThreshold"; + snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold); + totalRows++; + + cfgOpts[totalRows] = "slowLogMaxLen"; + snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen); + totalRows++; + + char scopeStr[64] = {0}; + getSlowLogScopeString(tsSlowLogScope, scopeStr); + cfgOpts[totalRows] = "slowLogScope"; + snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr); + totalRows++; + char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 15ad292bf5..4224d79391 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -282,7 +282,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { } } -_CONNECT: pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp, pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime); if (pConn == NULL) { @@ -301,6 +300,13 @@ _CONNECT: connectRsp.svrTimestamp = taosGetTimestampSec(); connectRsp.passVer = pUser->passVersion; connectRsp.authVer = pUser->authVersion; + connectRsp.monitorParas.tsEnableMonitor = tsEnableMonitor; + connectRsp.monitorParas.tsMonitorInterval = tsMonitorInterval; + connectRsp.monitorParas.tsSlowLogScope = tsSlowLogScope; + connectRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen; + connectRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold; + connectRsp.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest; + tstrncpy(connectRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN); connectRsp.whiteListVer = pUser->ipWhiteListVer; strcpy(connectRsp.sVer, version); @@ -660,6 +666,13 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) { SClientHbBatchRsp batchRsp = {0}; batchRsp.svrTimestamp = taosGetTimestampSec(); batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); + batchRsp.monitorParas.tsEnableMonitor = tsEnableMonitor; + batchRsp.monitorParas.tsMonitorInterval = tsMonitorInterval; + batchRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold; + batchRsp.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest; + tstrncpy(batchRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN); + batchRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen; + batchRsp.monitorParas.tsSlowLogScope = tsSlowLogScope; int32_t sz = taosArrayGetSize(batchReq.reqs); for (int i = 0; i < sz; i++) { diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt index 471f0535b8..c21cc4385b 100644 --- a/source/dnode/mnode/impl/test/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/CMakeLists.txt @@ -16,3 +16,4 @@ add_subdirectory(stb) add_subdirectory(topic) add_subdirectory(trans) #add_subdirectory(user) +#add_subdirectory(mnode) diff --git a/source/dnode/mnode/impl/test/mnode/CMakeLists.txt b/source/dnode/mnode/impl/test/mnode/CMakeLists.txt index 2a436e1d59..a5f014e779 100644 --- a/source/dnode/mnode/impl/test/mnode/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/mnode/CMakeLists.txt @@ -1,11 +1,11 @@ -# aux_source_directory(. MNODE_MNODE_TEST_SRC) -# add_executable(mmnodeTest ${MNODE_MNODE_TEST_SRC}) -# target_link_libraries( -# mmnodeTest -# PUBLIC sut -# ) + aux_source_directory(. MNODE_MNODE_TEST_SRC) + add_executable(mmnodeTest ${MNODE_MNODE_TEST_SRC}) + target_link_libraries( + mmnodeTest + PUBLIC sut + ) -# add_test( -# NAME mmnodeTest -# COMMAND mmnodeTest -# ) + add_test( + NAME mmnodeTest + COMMAND mmnodeTest + ) diff --git a/source/dnode/mnode/impl/test/mnode/mnode.cpp b/source/dnode/mnode/impl/test/mnode/mnode.cpp index 1f6dbd6dca..9195b9303b 100644 --- a/source/dnode/mnode/impl/test/mnode/mnode.cpp +++ b/source/dnode/mnode/impl/test/mnode/mnode.cpp @@ -280,4 +280,114 @@ TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) { ASSERT_NE(retry, retryMax); } +} + +#define SLOW_LOG_TYPE_NULL 0x0 +#define SLOW_LOG_TYPE_QUERY 0x1 +#define SLOW_LOG_TYPE_INSERT 0x2 +#define SLOW_LOG_TYPE_OTHERS 0x4 +#define SLOW_LOG_TYPE_ALL 0x7 +void getSlowLogScopeString(int32_t scope, char* result){ + if(scope == SLOW_LOG_TYPE_NULL) { + strcat(result, "NONE"); + return; + } + while(scope > 0){ + if(scope & SLOW_LOG_TYPE_QUERY) { + strcat(result, "QUERY"); + scope &= ~SLOW_LOG_TYPE_QUERY; + } else if(scope & SLOW_LOG_TYPE_INSERT) { + strcat(result, "INSERT"); + scope &= ~SLOW_LOG_TYPE_INSERT; + } else if(scope & SLOW_LOG_TYPE_OTHERS) { + strcat(result, "OTHERS"); + scope &= ~SLOW_LOG_TYPE_OTHERS; + } else{ + printf("invalid slow log scope:%d", scope); + return; + } + + if(scope > 0) { + strcat(result, "|"); + } + } +} + +// Define test cases +TEST_F(MndTestMnode, ScopeIsNull) { + // Arrange + char result[256] = {0}; + + // Act + getSlowLogScopeString(SLOW_LOG_TYPE_NULL, result); + + // Assert + EXPECT_STREQ(result, "NONE"); +} + +TEST_F(MndTestMnode, ScopeIsQuery) { + // Arrange + char result[256] = {0}; + + // Act + getSlowLogScopeString(SLOW_LOG_TYPE_QUERY, result); + + // Assert + EXPECT_STREQ(result, "QUERY"); +} + +TEST_F(MndTestMnode, ScopeIsInsert) { + // Arrange + char result[256] = {0}; + + // Act + getSlowLogScopeString(SLOW_LOG_TYPE_INSERT, result); + + // Assert + EXPECT_STREQ(result, "INSERT"); +} + +TEST_F(MndTestMnode, ScopeIsOthers) { + // Arrange + char result[256] = {0}; + + // Act + getSlowLogScopeString(SLOW_LOG_TYPE_OTHERS, result); + + // Assert + EXPECT_STREQ(result, "OTHERS"); +} + +TEST_F(MndTestMnode, ScopeIsMixed) { + // Arrange + char result[256] = {0}; + + // Act + getSlowLogScopeString(SLOW_LOG_TYPE_OTHERS|SLOW_LOG_TYPE_INSERT, result); + + // Assert + EXPECT_STREQ(result, "INSERT|OTHERS"); +} + +TEST_F(MndTestMnode, ScopeIsMixed1) { + // Arrange + char result[256] = {0}; + + // Act + getSlowLogScopeString(SLOW_LOG_TYPE_ALL, result); + + // Assert + EXPECT_STREQ(result, "QUERY|INSERT|OTHERS"); +} + +TEST_F(MndTestMnode, ScopeIsInvalid) { + // Arrange + char result[256] = {0}; + + // Act + getSlowLogScopeString(0xF000, result); + + // Assert + EXPECT_STREQ(result, ""); // Expect an empty string since the scope is invalid + // You may also want to check if the error message is correctly logged } \ No newline at end of file diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index c5c14950b2..6aaa79bb31 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -363,7 +363,7 @@ typedef struct SCtgUserAuth { } SCtgUserAuth; typedef struct SCatalog { - uint64_t clusterId; + int64_t clusterId; bool stopUpdate; SDynViewVersion dynViewVer; SHashObj* userCache; // key:user, value:SCtgUserAuth diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index e7d5a89d6f..4048c8841b 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -834,7 +834,7 @@ int32_t catalogInit(SCatalogCfg* cfg) { return TSDB_CODE_SUCCESS; } -int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { +int32_t catalogGetHandle(int64_t clusterId, SCatalog** catalogHandle) { if (NULL == catalogHandle) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 6da1da2b52..eac716339b 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -410,7 +410,7 @@ void ctgFreeHandle(SCatalog* pCtg) { return; } - uint64_t clusterId = pCtg->clusterId; + int64_t clusterId = pCtg->clusterId; ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->stbRent); @@ -498,7 +498,7 @@ void ctgClearHandle(SCatalog* pCtg) { return; } - uint64_t clusterId = pCtg->clusterId; + int64_t clusterId = pCtg->clusterId; ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->stbRent); diff --git a/source/libs/monitor/src/clientMonitor.c b/source/libs/monitor/src/clientMonitor.c deleted file mode 100644 index 0891932583..0000000000 --- a/source/libs/monitor/src/clientMonitor.c +++ /dev/null @@ -1,210 +0,0 @@ -#include "clientMonitor.h" -#include "os.h" -#include "tmisce.h" -#include "ttime.h" -#include "ttimer.h" -#include "tglobal.h" - -SRWLatch monitorLock; -void* tmrClientMonitor; -tmr_h tmrStartHandle; -SHashObj* clusterMonitorInfoTable; - -static int interval = 30 * 1000; -static int sendBathchSize = 1; - -int32_t sendReport(ClientMonitor* pMonitor, char* pCont); -void generateClusterReport(ClientMonitor* pMonitor, bool send) { - char ts[50]; - sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); - char* pCont = (char*)taos_collector_registry_bridge_new(pMonitor->registry, ts, "%" PRId64, NULL); - if(NULL == pCont) { - uError("generateClusterReport failed, get null content."); - return; - } - if (send && strlen(pCont) != 0) { - if (sendReport(pMonitor, pCont) == 0) { - taos_collector_registry_clear_batch(pMonitor->registry); - } - } - taosMemoryFreeClear(pCont); -} - -void reportSendProcess(void* param, void* tmrId) { - taosTmrReset(reportSendProcess, tsMonitorInterval * 1000, NULL, tmrClientMonitor, &tmrStartHandle); - taosRLockLatch(&monitorLock); - - static int index = 0; - index++; - ClientMonitor** ppMonitor = (ClientMonitor**)taosHashIterate(clusterMonitorInfoTable, NULL); - while (ppMonitor != NULL && *ppMonitor != NULL) { - ClientMonitor* pMonitor = *ppMonitor; - generateClusterReport(*ppMonitor, index == sendBathchSize); - ppMonitor = taosHashIterate(clusterMonitorInfoTable, ppMonitor); - } - - if (index == sendBathchSize) index = 0; - taosRUnLockLatch(&monitorLock); -} - -void monitorClientInitOnce() { - static int8_t init = 0; - if (atomic_exchange_8(&init, 1) == 0) { - uInfo("tscMonitorInit once."); - clusterMonitorInfoTable = - (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - - tmrClientMonitor = taosTmrInit(0, 0, 0, "MONITOR"); - tmrStartHandle = taosTmrStart(reportSendProcess, tsMonitorInterval * 1000, NULL, tmrClientMonitor); - if(tsMonitorInterval < 1){ - interval = 30 * 1000; - } else { - interval = tsMonitorInterval * 1000; - } - if (tsMonitorInterval < 10) { - sendBathchSize = (10 / sendBathchSize) + 1; - } - taosInitRWLatch(&monitorLock); - } -} - -void createMonitorClient(const char* clusterKey, SEpSet epSet, void* pTransporter) { - if (clusterKey == NULL || strlen(clusterKey) == 0) { - uError("createMonitorClient failed, clusterKey is NULL"); - return; - } - taosWLockLatch(&monitorLock); - if (taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)) == NULL) { - uInfo("createMonitorClient for %s.", clusterKey); - ClientMonitor* pMonitor = taosMemoryCalloc(1, sizeof(ClientMonitor)); - snprintf(pMonitor->clusterKey, sizeof(pMonitor->clusterKey), "%s", clusterKey); - pMonitor->registry = taos_collector_registry_new(clusterKey); - pMonitor->colector = taos_collector_new(clusterKey); - epsetAssign(&pMonitor->epSet, &epSet); - pMonitor->pTransporter = pTransporter; - - taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector); - pMonitor->counters = - (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - - taosHashPut(clusterMonitorInfoTable, clusterKey, strlen(clusterKey), &pMonitor, sizeof(ClientMonitor*)); - uInfo("createMonitorClient for %s finished %p.", clusterKey, pMonitor); - } - taosWUnLockLatch(&monitorLock); -} - -static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { - static int32_t emptyRspNum = 0; - if (TSDB_CODE_SUCCESS != code) { - uError("found error in monitorReport send callback, code:%d, please check the network.", code); - } - if (pMsg) { - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - } - return code; -} - -int32_t sendReport(ClientMonitor* pMonitor, char* pCont) { - SStatisReq sStatisReq; - sStatisReq.pCont = pCont; - sStatisReq.contLen = strlen(pCont); - - int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq); - if (tlen < 0) return 0; - void* buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - tSerializeSStatisReq(buf, tlen, &sStatisReq); - - SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (pInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pInfo->fp = monitorReportAsyncCB; - pInfo->msgInfo.pData = buf; - pInfo->msgInfo.len = tlen; - pInfo->msgType = TDMT_MND_STATIS; - // pInfo->param = taosMemoryMalloc(sizeof(int32_t)); - // *(int32_t*)pInfo->param = i; - pInfo->paramFreeFp = taosMemoryFree; - pInfo->requestId = tGenIdPI64(); - pInfo->requestObjRefId = 0; - - int64_t transporterId = 0; - return asyncSendMsgToServer(pMonitor->pTransporter, &pMonitor->epSet, &transporterId, pInfo); -} - -void clusterMonitorInit(const char* clusterKey, SEpSet epSet, void* pTransporter) { - monitorClientInitOnce(); - createMonitorClient(clusterKey, epSet, pTransporter); -} - -taos_counter_t* createClusterCounter(const char* clusterKey, const char* name, const char* help, size_t label_key_count, - const char** label_keys) { - ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); - - if (ppMonitor != NULL && *ppMonitor != NULL) { - ClientMonitor* pMonitor = *ppMonitor; - taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, name, strlen(name)); - if (ppCounter != NULL && *ppCounter != NULL) { - taosHashRemove(pMonitor->counters, name, strlen(name)); - uInfo("createClusterCounter remove old counter: %s.", name); - } - - taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); - if (newCounter != NULL) { - taos_collector_add_metric(pMonitor->colector, newCounter); - taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, sizeof(taos_counter_t*)); - uInfo("createClusterCounter %s(%p):%s : %p.", pMonitor->clusterKey, pMonitor, name, newCounter); - return newCounter; - } else { - return NULL; - } - } else { - return NULL; - } - return NULL; -} - -int taosClusterCounterInc(const char* clusterKey, const char* counterName, const char** label_values) { - taosRLockLatch(&monitorLock); - ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); - - if (ppMonitor != NULL && *ppMonitor != NULL) { - ClientMonitor* pMonitor = *ppMonitor; - taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName)); - if (ppCounter != NULL && *ppCounter != NULL) { - taos_counter_inc(*ppCounter, label_values); - } else { - uError("taosClusterCounterInc not found pCounter %s:%s.", clusterKey, counterName); - } - } else { - uError("taosClusterCounterInc not found pMonitor %s.", clusterKey); - } - taosRUnLockLatch(&monitorLock); - return 0; -} - -void clusterMonitorClose(const char* clusterKey) { - taosWLockLatch(&monitorLock); - ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); - - if (ppMonitor != NULL && *ppMonitor != NULL) { - ClientMonitor* pMonitor = *ppMonitor; - uInfo("clusterMonitorClose valule:%p clusterKey:%s.", pMonitor, pMonitor->clusterKey); - taosHashCleanup(pMonitor->counters); - taos_collector_registry_destroy(pMonitor->registry); - taosMemoryFree(pMonitor); - taosHashRemove(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); - } - taosWUnLockLatch(&monitorLock); -} - -const char* resultStr(SQL_RESULT_CODE code) { - static const char* result_state[] = {"Success", "Failed", "Cancel"}; - return result_state[code]; -} diff --git a/source/libs/monitor/src/monMain.c b/source/libs/monitor/src/monMain.c index 4ecc6f1261..21c196872c 100644 --- a/source/libs/monitor/src/monMain.c +++ b/source/libs/monitor/src/monMain.c @@ -24,6 +24,7 @@ SMonitor tsMonitor = {0}; char* tsMonUri = "/report"; char* tsMonFwUri = "/general-metric"; +char* tsMonSlowLogUri = "/slow-sql-detail-batch"; char* tsMonFwBasicUri = "/taosd-cluster-basic"; void monRecordLog(int64_t ts, ELogLevel level, const char *content) { @@ -631,7 +632,7 @@ void monGenAndSendReportBasic() { monCleanupMonitorInfo(pMonitor); } -void monSendContent(char *pCont) { +void monSendContent(char *pCont, const char* uri) { if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; if(tsMonitorLogProtocol){ if (pCont != NULL){ @@ -640,7 +641,7 @@ void monSendContent(char *pCont) { } if (pCont != NULL) { EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; - if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { + if (taosSendHttpReport(tsMonitor.cfg.server, uri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { uError("failed to send monitor msg"); } } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 2d5edcb01b..78ecd55c64 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7254,6 +7254,7 @@ static int32_t buildCmdMsg(STranslateContext* pCxt, int16_t msgType, FSerializeF pCxt->pCmdMsg->msgLen = func(NULL, 0, pReq); pCxt->pCmdMsg->pMsg = taosMemoryMalloc(pCxt->pCmdMsg->msgLen); if (NULL == pCxt->pCmdMsg->pMsg) { + taosMemoryFreeClear(pCxt->pCmdMsg); return TSDB_CODE_OUT_OF_MEMORY; } func(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, pReq); diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index 28f4178790..948040ac76 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -78,7 +78,9 @@ void osDefaultInit() { } strcpy(tsDataDir, TD_DATA_DIR_PATH); strcpy(tsLogDir, TD_LOG_DIR_PATH); - strcpy(tsTempDir, TD_TMP_DIR_PATH); + if(strlen(tsTempDir) == 0){ + strcpy(tsTempDir, TD_TMP_DIR_PATH); + } } void osUpdate() { diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index bdd43fe9fa..ac6cf7bad2 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -66,7 +66,7 @@ typedef struct TdFile { void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath) { #ifdef WINDOWS - const char *tdengineTmpFileNamePrefix = "tdengine-"; + char tmpPath[PATH_MAX]; int32_t len = (int32_t)strlen(inputTmpDir); @@ -76,7 +76,7 @@ void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, cha tmpPath[len++] = '\\'; } - strcpy(tmpPath + len, tdengineTmpFileNamePrefix); + strcpy(tmpPath + len, TD_TMP_FILE_PREFIX); if (strlen(tmpPath) + strlen(fileNamePrefix) + strlen("-%d-%s") < PATH_MAX) { strcat(tmpPath, fileNamePrefix); strcat(tmpPath, "-%d-%s"); @@ -88,8 +88,6 @@ void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, cha #else - const char *tdengineTmpFileNamePrefix = "tdengine-"; - char tmpPath[PATH_MAX]; int32_t len = strlen(inputTmpDir); memcpy(tmpPath, inputTmpDir, len); @@ -99,7 +97,7 @@ void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, cha tmpPath[len++] = '/'; } - strcpy(tmpPath + len, tdengineTmpFileNamePrefix); + strcpy(tmpPath + len, TD_TMP_FILE_PREFIX); if (strlen(tmpPath) + strlen(fileNamePrefix) + strlen("-%d-%s") < PATH_MAX) { strcat(tmpPath, fileNamePrefix); strcat(tmpPath, "-%d-%s"); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 80de20f5f5..978dd8ec78 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -371,6 +371,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_ENCRYPTKEY, "invalid encryption ke TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ENCRYPTKEY_CHANGED, "encryption key was changed") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_ENCRYPT_KLEN, "Invalid encryption key length") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL,"statusInterval not match") +TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_MONITOR_PARAS, "monitor paras not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_TIMEZONE, "timezone not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_CHARSET, "charset not match") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_INVALID_LOCALE, "locale not match") diff --git a/tests/army/cmdline/fullopt.py b/tests/army/cmdline/fullopt.py index e61501d7b8..77a22b9256 100644 --- a/tests/army/cmdline/fullopt.py +++ b/tests/army/cmdline/fullopt.py @@ -70,7 +70,6 @@ class TDTestCase(TBase): "smlTagName tagname", "smlTsDefaultName tsdef", "serverPort 6030", - "slowLogScope insert", "timezone tz", "tempDir /var/tmp" ] diff --git a/tests/script/tsim/show/basic.sim b/tests/script/tsim/show/basic.sim index 8990e492fa..4b2e33b45e 100644 --- a/tests/script/tsim/show/basic.sim +++ b/tests/script/tsim/show/basic.sim @@ -230,7 +230,7 @@ endi sql_error show create stable t0; sql show variables; -if $rows != 4 then +if $rows != 9 then return -1 endi diff --git a/tests/script/tsim/valgrind/checkError1.sim b/tests/script/tsim/valgrind/checkError1.sim index debe633f06..5629fbb4bd 100644 --- a/tests/script/tsim/valgrind/checkError1.sim +++ b/tests/script/tsim/valgrind/checkError1.sim @@ -120,7 +120,7 @@ if $rows != 3 then endi sql show variables; -if $rows != 4 then +if $rows != 9 then return -1 endi diff --git a/tests/system-test/2-query/db.py b/tests/system-test/2-query/db.py index cfe224acb0..e2c056cd5b 100644 --- a/tests/system-test/2-query/db.py +++ b/tests/system-test/2-query/db.py @@ -45,7 +45,7 @@ class TDTestCase: def case2(self): tdSql.query("show variables") - tdSql.checkRows(4) + tdSql.checkRows(9) for i in range(self.replicaVar): tdSql.query("show dnode %d variables like 'debugFlag'" % (i + 1))