diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 90ee6f7cc0..58dc8fb354 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -24,6 +24,7 @@ extern "C" { #endif +#define SLOW_LOG_TYPE_NULL 0x0 #define SLOW_LOG_TYPE_QUERY 0x1 #define SLOW_LOG_TYPE_INSERT 0x2 #define SLOW_LOG_TYPE_OTHERS 0x4 @@ -176,7 +177,9 @@ extern int32_t tsMaxRetryWaitTime; extern bool tsUseAdapter; extern int32_t tsMetaCacheMaxSize; extern int32_t tsSlowLogThreshold; +extern int32_t tsSlowLogThresholdTest; extern int32_t tsSlowLogScope; +extern int32_t tsSlowLogMaxLen; extern int32_t tsTimeSeriesThreshold; extern bool tsMultiResultFunctionStarReturnTags; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c840aef6cf..4907d2a643 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -652,6 +652,14 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp); #define SSCHMEA_BYTES(s) ((s)->bytes) #define SSCHMEA_NAME(s) ((s)->name) +typedef struct { + bool tsEnableMonitor; + int32_t tsMonitorInterval; + int32_t tsSlowLogThreshold; + int32_t tsSlowLogMaxLen; + int32_t tsSlowLogScope; +} SMonitorParas; + typedef struct { int32_t nCols; int32_t version; @@ -966,6 +974,7 @@ typedef struct { char sVer[TSDB_VERSION_LEN]; char sDetailVer[128]; int64_t whiteListVer; + SMonitorParas monitorParas; } SConnectRsp; int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); @@ -1631,6 +1640,7 @@ typedef struct { int8_t enableWhiteList; int8_t encryptionKeyStat; uint32_t encryptionKeyChksum; + SMonitorParas monitorParas; } SClusterCfg; typedef struct { @@ -1722,9 +1732,15 @@ int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); void tFreeSStatusReq(SStatusReq* pReq); +typedef enum { + MONITOR_TYPE_COUNTER = 0, + MONITOR_TYPE_SLOW_LOG = 1, +} MONITOR_TYPE; + typedef struct { - int32_t contLen; - char* pCont; + int32_t contLen; + char* pCont; + MONITOR_TYPE type; } SStatisReq; int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq); @@ -3254,6 +3270,7 @@ typedef struct { int64_t rspId; int32_t svrTimestamp; SArray* rsps; // SArray + SMonitorParas monitorParas; } SClientHbBatchRsp; static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); } diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 90cc4ac157..3f1cf74cfa 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -206,7 +206,7 @@ int32_t catalogInit(SCatalogCfg* cfg); * @param catalogHandle (output, NO need to free it) * @return error code */ -int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle); +int32_t catalogGetHandle(int64_t clusterId, SCatalog** catalogHandle); int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum, int64_t* stateTs); diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 9c0302a15f..dfe083b732 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -23,6 +23,7 @@ extern "C" { #include "taos_monitor.h" #include "thash.h" #include "query.h" +#include "tqueue.h" typedef enum SQL_RESULT_CODE { SQL_RESULT_SUCCESS = 0, @@ -30,23 +31,35 @@ typedef enum SQL_RESULT_CODE { SQL_RESULT_CANCEL = 2, } SQL_RESULT_CODE; -const char* resultStr(SQL_RESULT_CODE code); +#define SLOW_LOG_SEND_SIZE 1024*1024 +extern tsem2_t monitorSem; +extern STaosQueue* monitorQueue; typedef struct { - char clusterKey[512]; - SEpSet epSet; - void* pTransporter; + int64_t clusterId; taos_collector_registry_t* registry; taos_collector_t* colector; SHashObj* counters; -} ClientMonitor; +} MonitorClient; -void clusterMonitorInit(const char* clusterKey, SEpSet epSet, void* pTransporter); -void clusterMonitorClose(const char* clusterKey); -taos_counter_t* createClusterCounter(const char* clusterKey, const char* name, const char* help, size_t label_key_count, - const char** label_keys); -int taosClusterCounterInc(const char* clusterKey, const char* counterName, const char** label_values); -void cluster_monitor_stop(); +typedef struct { + int64_t clusterId; + char *value; +} MonitorSlowLogData; + +void monitorClose(); +void monitorInit(); +void monitorSendAllSlowLogFromTempDir(void* pInst); + +void monitorClientSQLReqInit(int64_t clusterKey); +void monitorClientSlowQueryInit(int64_t clusterId); +void monitorCreateClient(int64_t clusterId); +void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys); +void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values); +void* monitorThreadFunc(void *param); +void monitorFreeSlowLogData(MonitorSlowLogData* pData); +const char* monitorResultStr(SQL_RESULT_CODE code); +void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet); #ifdef __cplusplus } diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 256be26999..9d7878ecf7 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -227,7 +227,7 @@ void monSetSmInfo(SMonSmInfo *pInfo); void monSetBmInfo(SMonBmInfo *pInfo); void monGenAndSendReport(); void monGenAndSendReportBasic(); -void monSendContent(char *pCont); +void monSendContent(char *pCont, const char* uri); void tFreeSMonMmInfo(SMonMmInfo *pInfo); void tFreeSMonVmInfo(SMonVmInfo *pInfo); diff --git a/include/os/osFile.h b/include/os/osFile.h index 9c9027e931..4c56244278 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -72,6 +72,9 @@ TdFilePtr taosCreateFile(const char *path, int32_t tdFileOptions); #define TD_FILE_ACCESS_EXIST_OK 0x1 #define TD_FILE_ACCESS_READ_OK 0x2 #define TD_FILE_ACCESS_WRITE_OK 0x4 + +#define TD_TMP_FILE_PREFIX "tdengine-" + bool taosCheckAccessFile(const char *pathname, int mode); int32_t taosLockFile(TdFilePtr pFile); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 2fbf4eeabb..7a84215e12 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -115,10 +115,11 @@ struct SAppInstInfo { SArray* pQnodeList; SAppClusterSummary summary; SList* pConnList; // STscObj linked list - uint64_t clusterId; + int64_t clusterId; void* pTransporter; SAppHbMgr* pAppHbMgr; char* instKey; + SMonitorParas monitorParas; }; typedef struct SAppInfo { @@ -127,6 +128,7 @@ typedef struct SAppInfo { int32_t pid; int32_t numOfThreads; SHashObj* pInstMap; + SHashObj* pInstMapByClusterId; TdThreadMutex mutex; } SAppInfo; @@ -350,7 +352,7 @@ void* createTscObj(const char* user, const char* auth, const char* db, int32_ void destroyTscObj(void* pObj); STscObj* acquireTscObj(int64_t rid); int32_t releaseTscObj(int64_t rid); -void destroyAppInst(SAppInstInfo* pAppInfo); +void destroyAppInst(void* pAppInfo); uint64_t generateRequestId(); @@ -403,7 +405,7 @@ void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr); void destroyAllRequests(SHashObj* pRequests); void stopAllRequests(SHashObj* pRequests); -SAppInstInfo* getAppInstInfo(const char* clusterKey); +//SAppInstInfo* getAppInstInfo(const char* clusterKey); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); @@ -441,10 +443,8 @@ void freeQueryParam(SSyncQueryParam* param); int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser, SParseSqlRes* pRes); #endif -void clientSlowQueryMonitorInit(const char* clusterKey); -void SlowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost); -void clientSQLReqMonitorInit(const char* clusterKey); +void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost); enum { MONITORSQLTYPESELECT = 0, @@ -454,8 +454,6 @@ enum { void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type); -void clientMonitorClose(const char* clusterKey); - #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 23b9649c6b..0a790e5a8c 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -13,9 +13,12 @@ * along with this program. If not, see . */ +#include +#include "cJSON.h" #include "catalog.h" #include "clientInt.h" #include "clientLog.h" +#include "clientMonitor.h" #include "functionMgt.h" #include "os.h" #include "osSleep.h" @@ -26,6 +29,7 @@ #include "tglobal.h" #include "thttp.h" #include "tmsg.h" +#include "tqueue.h" #include "tref.h" #include "trpc.h" #include "tsched.h" @@ -70,6 +74,72 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) { return TSDB_CODE_SUCCESS; } +static void concatStrings(SArray *list, char* buf, int size){ + int len = 0; + for(int i = 0; i < taosArrayGetSize(list); i++){ + char* db = taosArrayGet(list, i); + int ret = snprintf(buf, size - len, "%s,", db); + if (ret < 0) { + tscError("snprintf failed, buf:%s, ret:%d", buf, ret); + break; + } + len += ret; + if (len >= size){ + tscInfo("dbList is truncated, buf:%s, len:%d", buf, len); + break; + } + } +} + +static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration){ + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + tscError("[monitor] cJSON_CreateObject failed"); + return; + } + cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateNumber(pTscObj->pAppInfo->clusterId)); + cJSON_AddItemToObject(json, "start_ts", cJSON_CreateNumber(pRequest->metric.start)); + cJSON_AddItemToObject(json, "request_id", cJSON_CreateNumber(pRequest->requestId)); + cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration/1000)); + cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)); + cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))); + cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)); + cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.totalRows)); + if(strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){ + char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen]; + pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0'; + cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)); + pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = tmp; + }else{ + cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)); + } + + cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)); + cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)); + cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)); + cJSON_AddItemToObject(json, "process_id", cJSON_CreateNumber(appInfo.pid)); + char dbList[1024] = {0}; + concatStrings(pRequest->dbList, dbList, sizeof(dbList)); + cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)); + + MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0); + if (slowLogData == NULL) { + cJSON_Delete(json); + tscError("[monitor] failed to allocate slow log data"); + return; + } + slowLogData->clusterId = pTscObj->pAppInfo->clusterId; + slowLogData->value = cJSON_PrintUnformatted(json); + tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); + if (taosWriteQitem(monitorQueue, slowLogData) == 0){ + tsem2_post(&monitorSem); + }else{ + monitorFreeSlowLogData(slowLogData); + taosFreeQitem(slowLogData); + } + cJSON_Delete(json); +} + static void deregisterRequest(SRequestObj *pRequest) { if (pRequest == NULL) { tscError("pRequest == NULL"); @@ -83,7 +153,7 @@ static void deregisterRequest(SRequestObj *pRequest) { int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int32_t reqType = SLOW_LOG_TYPE_OTHERS; - int64_t duration = taosGetTimestampUs() - pRequest->metric.start; + int64_t duration = taosGetTimestampUs() - pRequest->metric.start/1000; tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%.2f ms, " "current:%d, app current:%d", @@ -113,21 +183,26 @@ static void deregisterRequest(SRequestObj *pRequest) { nodesSimReleaseAllocator(pRequest->allocatorRefId); } - if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) { - sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT); - } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { - sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT); - } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) { - sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE); + if(pTscObj->pAppInfo->monitorParas.tsEnableMonitor){ + if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) { + sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT); + } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { + sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPESELECT); + } else if (QUERY_NODE_DELETE_STMT == pRequest->stmtType) { + sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEDELETE); + } } - if (duration >= (tsSlowLogThreshold * 1000000UL)) { + if (duration >= (pTscObj->pAppInfo->monitorParas.tsSlowLogThreshold * 1000000UL || duration >= tsSlowLogThresholdTest)) { atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); - if (tsSlowLogScope & reqType) { - taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s", + if (pTscObj->pAppInfo->monitorParas.tsSlowLogScope & reqType) { + taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 " ns, Duration:%" PRId64 "us, SQL:%s", taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr); - SlowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration); + if(pTscObj->pAppInfo->monitorParas.tsEnableMonitor){ + slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration); + generateWriteSlowLog(pTscObj, pRequest, reqType, duration); + } } } @@ -233,14 +308,13 @@ void stopAllRequests(SHashObj *pRequests) { } } -void destroyAppInst(SAppInstInfo *pAppInfo) { +void destroyAppInst(void *info) { + SAppInstInfo* pAppInfo = (SAppInstInfo*)info; tscDebug("destroy app inst mgr %p", pAppInfo); taosThreadMutexLock(&appInfo.mutex); - clientMonitorClose(pAppInfo->instKey); hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr); - taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey)); taosThreadMutexUnlock(&appInfo.mutex); @@ -345,7 +419,7 @@ void *createRequest(uint64_t connId, int32_t type, int64_t reqid) { pRequest->resType = RES_TYPE__QUERY; pRequest->requestId = reqid == 0 ? generateRequestId() : reqid; - pRequest->metric.start = taosGetTimestampUs(); + pRequest->metric.start = taosGetTimestampNs(); pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default pRequest->type = type; @@ -670,7 +744,7 @@ void tscStopCrashReport() { } if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) { - tscDebug("hb thread already stopped"); + tscDebug("crash report thread already stopped"); return; } @@ -719,7 +793,8 @@ void taos_init_imp(void) { appInfo.pid = taosGetPId(); appInfo.startTime = taosGetTimestampMs(); appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - + appInfo.pInstMapByClusterId = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst); deltaToUtcInitOnce(); char logDirName[64] = {0}; @@ -769,6 +844,7 @@ void taos_init_imp(void) { taosThreadMutexInit(&appInfo.mutex, NULL); tscCrashReportInit(); + monitorInit(); tscDebug("client is initialized successfully"); } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 1b6cb8fd22..d4ce42d8b1 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -18,6 +18,7 @@ #include "clientLog.h" #include "scheduler.h" #include "trpc.h" +#include "tglobal.h" typedef struct { union { @@ -67,7 +68,7 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC } static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) { - uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; + int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) { SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) { @@ -536,6 +537,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { } SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo; + pInst->monitorParas = pRsp.monitorParas; + tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d", pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold); if (code != 0) { pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1; @@ -593,9 +596,9 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { } tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql)); - desc.stime = pRequest->metric.start / 1000; + desc.stime = pRequest->metric.start / 1000000; desc.queryId = pRequest->requestId; - desc.useconds = now - pRequest->metric.start; + desc.useconds = now - pRequest->metric.start/1000; desc.reqRid = pRequest->self; desc.stableQuery = pRequest->stableQuery; desc.isSubQuery = pRequest->isSubReq; @@ -1115,7 +1118,7 @@ int32_t hbGatherAppInfo(void) { SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (pAppHbMgr == NULL) continue; - uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; + int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); if (NULL == pApp) { memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary)); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 1057a00725..521eb02a24 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -158,9 +158,6 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port); pInst = &p; - - clientSlowQueryMonitorInit(p->instKey); - clientSQLReqMonitorInit(p->instKey); } else { ASSERTS((*pInst) && (*pInst)->pAppHbMgr, "*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL); // reset to 0 in case of conn with duplicated user key but its user has ever been dropped. @@ -174,14 +171,14 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType); } -SAppInstInfo* getAppInstInfo(const char* clusterKey) { - SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey)); - if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) { - return *ppAppInstInfo; - } else { - return NULL; - } -} +//SAppInstInfo* getAppInstInfo(const char* clusterKey) { +// SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey)); +// if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) { +// return *ppAppInstInfo; +// } else { +// return NULL; +// } +//} void freeQueryParam(SSyncQueryParam* param) { if (param == NULL) return; @@ -737,7 +734,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .pNodeList = pNodeList, .pDag = pDag, .sql = pRequest->sqlstr, - .startTs = pRequest->metric.start, + .startTs = pRequest->metric.start/1000, .execFp = NULL, .cbParam = NULL, .chkKillFp = chkRequestKilled, @@ -1208,7 +1205,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat .pDag = pDag, .allocatorRefId = pRequest->allocatorRefId, .sql = pRequest->sqlstr, - .startTs = pRequest->metric.start, + .startTs = pRequest->metric.start/1000, .execFp = schedulerExecCb, .cbParam = pWrapper, .chkKillFp = chkRequestKilled, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 81ae465cd2..ba9493258b 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -17,6 +17,7 @@ #include "clientInt.h" #include "clientLog.h" #include "clientStmt.h" +#include "clientMonitor.h" #include "functionMgt.h" #include "os.h" #include "query.h" @@ -55,6 +56,9 @@ void taos_cleanup(void) { return; } + monitorClose(); + taosHashCleanup(appInfo.pInstMap); + taosHashCleanup(appInfo.pInstMapByClusterId); tscStopCrashReport(); hbMgrCleanUp(); @@ -279,7 +283,6 @@ void taos_close_internal(void *taos) { STscObj *pTscObj = (STscObj *)taos; tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); - // clientMonitorClose(pTscObj->pAppInfo->instKey); taosRemoveRef(clientConnRefPool, pTscObj->id); } diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c new file mode 100644 index 0000000000..86e4da4517 --- /dev/null +++ b/source/client/src/clientMonitor.c @@ -0,0 +1,616 @@ +#include "clientMonitor.h" +#include "os.h" +#include "tmisce.h" +#include "ttime.h" +#include "ttimer.h" +#include "tglobal.h" +#include "tqueue.h" +#include "cJSON.h" +#include "clientInt.h" + +SRWLatch monitorLock; +void* monitorTimer; +SHashObj* monitorCounterHash; +int32_t monitorStop = -1; +tsem2_t monitorSem; +STaosQueue* monitorQueue; +SHashObj* monitorSlowLogHash; + +static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){ + if (tsTempDir == NULL) { + return -1; + } + int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); + if (ret < 0){ + uError("failed to get tmp path ret:%d", ret); + return ret; + } + return 0; +} + +static void destroyCounter(void* data){ + if (data == NULL) { + return; + } + taos_counter_t* conuter = *(taos_counter_t**)data; + if(conuter == NULL){ + return; + } + taos_counter_destroy(conuter); +} + +static void destroyClientFile(void* data){ + if (data == NULL) { + return; + } + TdFilePtr pFile = *(TdFilePtr*)data; + if(pFile == NULL){ + return; + } + taosUnLockFile(pFile); + taosCloseFile(&pFile); +} + +static void destroyMonitorClient(void* data){ + if (data == NULL) { + return; + } + MonitorClient* pMonitor = *(MonitorClient**)data; + if(pMonitor == NULL){ + return; + } + taosHashCleanup(pMonitor->counters); + taos_collector_registry_destroy(pMonitor->registry); + taos_collector_destroy(pMonitor->colector); + taosMemoryFree(pMonitor); +} + +static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) { + void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES); + if(p == NULL){ + uError("failed to get app inst, clusterId:%" PRIx64, clusterId); + return NULL; + } + return *(SAppInstInfo**)p; +} + +static int32_t tscMonitortInit() { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + TdThread monitorThread; + if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { + uError("failed to create monitor thread since %s", strerror(errno)); + return -1; + } + + taosThreadAttrDestroy(&thAttr); + return 0; +} + +static void tscMonitorStop() { + if (atomic_val_compare_exchange_32(&monitorStop, 0, 1)) { + uDebug("monitor thread already stopped"); + return; + } + + while (atomic_load_32(&monitorStop) > 0) { + taosMsleep(100); + } +} + +static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { + if (TSDB_CODE_SUCCESS != code) { + uError("found error in monitorReport send callback, code:%d, please check the network.", code); + } + if (pMsg) { + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + } + return code; +} + +static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITOR_TYPE type) { + SStatisReq sStatisReq; + sStatisReq.pCont = pCont; + sStatisReq.contLen = strlen(pCont); + sStatisReq.type = type; + + int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq); + if (tlen < 0) return 0; + void* buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + uError("sendReport failed, out of memory, len:%d", tlen); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + tSerializeSStatisReq(buf, tlen, &sStatisReq); + + SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (pInfo == NULL) { + uError("sendReport failed, out of memory send info"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pInfo->fp = monitorReportAsyncCB; + pInfo->msgInfo.pData = buf; + pInfo->msgInfo.len = tlen; + pInfo->msgType = TDMT_MND_STATIS; + // pInfo->param = taosMemoryMalloc(sizeof(int32_t)); + // *(int32_t*)pInfo->param = i; + pInfo->paramFreeFp = taosMemoryFree; + pInfo->requestId = tGenIdPI64(); + pInfo->requestObjRefId = 0; + + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); + if (code != TSDB_CODE_SUCCESS) { + uError("sendReport failed, code:%d", code); + } + return code; +} + +void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){ + char buf[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log + int32_t offset = 0; + if(taosLSeekFile(pFile, 0, SEEK_SET) < 0){ + uError("failed to seek file:%p code: %d", pFile, errno); + return; + } + while(1){ + int64_t readSize = taosReadFile(pFile, buf + offset, SLOW_LOG_SEND_SIZE - offset); + if (readSize <= 0) { + uError("failed to read len from file:%p since %s", pFile, terrstr()); + return; + } + + cJSON* pArray = cJSON_CreateArray(); + char* string = buf; + for(int i = 0; i < readSize + offset; i++){ + if (buf[i] == '\0') { + cJSON_AddItemToArray(pArray, cJSON_CreateString(string)); + uDebug("[monitor] slow log:%s", string); + string = buf + i + 1; + } + } + + ASSERT(cJSON_GetArraySize(pArray) > 0); // make sure SLOW_LOG_SEND_SIZE is bigger than one line in pFile + char* pCont = cJSON_PrintUnformatted(pArray); + if (pTransporter && pCont != NULL) { + if(sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_SLOW_LOG) != 0){ + if(taosLSeekFile(pFile, -readSize, SEEK_CUR) < 0){ + uError("failed to seek file:%p code: %d", pFile, errno); + } + uError("failed to send report:%s", pCont); + cJSON_free(pCont); + cJSON_Delete(pArray); + return; + } + uDebug("[monitor] send slow log:%s", pCont) + } + cJSON_free(pCont); + cJSON_Delete(pArray); + + if (readSize + offset < SLOW_LOG_SEND_SIZE) { + break; + } + offset = SLOW_LOG_SEND_SIZE - (string - buf); + if(buf != string && offset != 0){ + memmove(buf, string, offset); + uDebug("[monitor] left slow log:%s", buf) + } + } + if(taosFtruncateFile(pFile, 0) < 0){ + uError("failed to truncate file:%p code: %d", pFile, errno); + } + uDebug("[monitor] send slow log file:%p", pFile); +} + +static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet *epSet) { + char ts[50] = {0}; + sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); + char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL); + if(NULL == pCont) { + uError("generateClusterReport failed, get null content."); + return; + } + + if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER) == 0) { + taos_collector_registry_clear_batch(registry); + } + taosMemoryFreeClear(pCont); +} + +static void reportSendProcess(void* param, void* tmrId) { + taosRLockLatch(&monitorLock); + MonitorClient* pMonitor = (MonitorClient*)param; + SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); + if(pInst == NULL){ + taosRUnLockLatch(&monitorLock); + return; + } + + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); + taosRUnLockLatch(&monitorLock); + taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); +} + +static void sendAllSlowLog(){ + taosRLockLatch(&monitorLock); + void* data = taosHashIterate(monitorSlowLogHash, NULL); + while (data != NULL) { + TdFilePtr pFile = *(TdFilePtr*)data; + if (pFile != NULL){ + int64_t clusterId = *(int64_t*)taosHashGetKey(data, NULL); + SAppInstInfo* pInst = getAppInstByClusterId(clusterId); + if(pInst == NULL){ + taosHashCancelIterate(monitorSlowLogHash, data); + break; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep); + } + data = taosHashIterate(monitorSlowLogHash, data); + } + taosRUnLockLatch(&monitorLock); +} + +void monitorSendAllSlowLogFromTempDir(void* inst){ + SAppInstInfo* pInst = (SAppInstInfo*)inst; + if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){ + uInfo("[monitor] monitor is disabled, skip send slow log"); + return; + } + char namePrefix[PATH_MAX] = {0}; + if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) { + uError("failed to generate slow log file name prefix"); + return; + } + + taosRLockLatch(&monitorLock); + + char tmpPath[PATH_MAX] = {0}; + if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) { + goto END; + } + + TdDirPtr pDir = taosOpenDir(tmpPath); + if (pDir == NULL) { + goto END; + } + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + if (taosDirEntryIsDir(de)) { + continue; + } + + char *name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || + strcmp(name, "..") == 0 || + strstr(name, namePrefix) == NULL) { + uInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId); + continue; + } + + char filename[PATH_MAX] = {0}; + snprintf(filename, sizeof(filename), "%s%s", tmpPath, name); + uDebug("[monitor] send slow log file:%s", filename); + TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ); + if (pFile == NULL) { + uError("failed to open file:%s since %s", filename, terrstr()); + continue; + } + if (taosLockFile(pFile) < 0) { + uError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr()); + taosCloseFile(&pFile); + continue; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep); + taosUnLockFile(pFile); + taosCloseFile(&pFile); + taosRemoveFile(filename); + } + + taosCloseDir(&pDir); + +END: + taosRUnLockLatch(&monitorLock); +} + +static void sendAllCounter(){ + taosRLockLatch(&monitorLock); + MonitorClient** ppMonitor = (MonitorClient**)taosHashIterate(monitorCounterHash, NULL); + while (ppMonitor != NULL) { + MonitorClient* pMonitor = *ppMonitor; + if (pMonitor != NULL){ + SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); + if(pInst == NULL){ + taosHashCancelIterate(monitorCounterHash, ppMonitor); + break; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); + } + ppMonitor = taosHashIterate(monitorCounterHash, ppMonitor); + } + taosRUnLockLatch(&monitorLock); +} + +void monitorInit() { + uInfo("[monitor] tscMonitor init"); + monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (monitorCounterHash == NULL) { + uError("failed to create monitorCounterHash"); + } + taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); + + monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (monitorSlowLogHash == NULL) { + uError("failed to create monitorSlowLogHash"); + } + taosHashSetFreeFp(monitorSlowLogHash, destroyClientFile); + + monitorTimer = taosTmrInit(0, 0, 0, "MONITOR"); + if (monitorTimer == NULL) { + uError("failed to create monitor timer"); + } + + taosInitRWLatch(&monitorLock); + tscMonitortInit(); +} + +void monitorClose() { + uInfo("[monitor] tscMonitor close"); + tscMonitorStop(); + sendAllSlowLog(); + sendAllCounter(); + taosHashCleanup(monitorCounterHash); + taosHashCleanup(monitorSlowLogHash); + taosTmrCleanUp(monitorTimer); +} + +void monitorCreateClient(int64_t clusterId) { + MonitorClient* pMonitor = NULL; + taosWLockLatch(&monitorLock); + if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) { + uInfo("[monitor] monitorCreateClient for %" PRIx64, clusterId); + pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient)); + if (pMonitor == NULL) { + uError("failed to create monitor client"); + goto fail; + } + pMonitor->clusterId = clusterId; + char clusterKey[32] = {0}; + if(snprintf(clusterKey, sizeof(clusterKey), "%"PRId64, clusterId) < 0){ + uError("failed to create cluster key"); + goto fail; + } + pMonitor->registry = taos_collector_registry_new(clusterKey); + if(pMonitor->registry == NULL){ + uError("failed to create registry"); + goto fail; + } + pMonitor->colector = taos_collector_new(clusterKey); + if(pMonitor->colector == NULL){ + uError("failed to create collector"); + goto fail; + } + + taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector); + pMonitor->counters = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (pMonitor->counters == NULL) { + uError("failed to create monitor counters"); + goto fail; + } + taosHashSetFreeFp(pMonitor->counters, destroyCounter); + + if(taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0){ + uError("failed to put monitor client to hash"); + goto fail; + } + + SAppInstInfo* pInst = getAppInstByClusterId(clusterId); + if(pInst == NULL){ + uError("failed to get app instance by cluster id"); + pMonitor = NULL; + goto fail; + } + taosTmrStart(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer); + uInfo("[monitor] monitorCreateClient for %"PRIx64 "finished %p.", clusterId, pMonitor); + } + taosWUnLockLatch(&monitorLock); + return; + +fail: + destroyMonitorClient(&pMonitor); + taosWUnLockLatch(&monitorLock); +} + +void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys) { + taosWLockLatch(&monitorLock); + MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES); + if (ppMonitor == NULL || *ppMonitor == NULL) { + uError("failed to get monitor client"); + goto end; + } + taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); + if (newCounter == NULL) + return; + MonitorClient* pMonitor = *ppMonitor; + taos_collector_add_metric(pMonitor->colector, newCounter); + if(taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0){ + uError("failed to put counter to monitor"); + taos_counter_destroy(newCounter); + goto end; + } + uInfo("[monitor] monitorCreateClientCounter %"PRIx64"(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter); + +end: + taosWUnLockLatch(&monitorLock); +} + +void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) { + taosRLockLatch(&monitorLock); + MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES); + if (ppMonitor == NULL || *ppMonitor == NULL) { + uError("monitorCounterInc not found pMonitor %"PRId64, clusterId); + goto end; + } + + MonitorClient* pMonitor = *ppMonitor; + taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName)); + if (ppCounter == NULL || *ppCounter != NULL) { + uError("monitorCounterInc not found pCounter %"PRIx64":%s.", clusterId, counterName); + goto end; + } + taos_counter_inc(*ppCounter, label_values); + uInfo("[monitor] monitorCounterInc %"PRIx64"(%p):%s", pMonitor->clusterId, pMonitor, counterName); + +end: + taosRUnLockLatch(&monitorLock); +} + +const char* monitorResultStr(SQL_RESULT_CODE code) { + static const char* result_state[] = {"Success", "Failed", "Cancel"}; + return result_state[code]; +} + +void monitorFreeSlowLogData(MonitorSlowLogData* pData) { + if (pData == NULL) { + return; + } + taosMemoryFree(pData->value); +} + +void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&monitorStop, -1); } + +void reportSlowLog(void* param, void* tmrId) { + taosRLockLatch(&monitorLock); + SAppInstInfo* pInst = getAppInstByClusterId((int64_t)param); + if(pInst == NULL){ + uError("failed to get app inst, clusterId:%"PRIx64, (int64_t)param); + taosRUnLockLatch(&monitorLock); + return; + } + + void* tmp = taosHashGet(monitorSlowLogHash, ¶m, LONG_BYTES); + if(tmp == NULL){ + uError("failed to get file inst, clusterId:%"PRIx64, (int64_t)param); + taosRUnLockLatch(&monitorLock); + return; + } + + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + monitorReadSendSlowLog(*(TdFilePtr*)tmp, pInst->pTransporter, &ep); + taosRUnLockLatch(&monitorLock); + + taosTmrReset(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); +} + +void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){ + taosRLockLatch(&monitorLock); + TdFilePtr pFile = NULL; + void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); + if (tmp == NULL){ + char path[PATH_MAX] = {0}; + char clusterId[32] = {0}; + if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0){ + uError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId); + goto FAILED; + } + taosGetTmpfilePath(tmpPath, clusterId, path); + uInfo("[monitor] create slow log file:%s", path); + pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to open file:%s since %s", path, terrstr()); + goto FAILED; + } + + if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pFile, POINTER_BYTES) != 0){ + uError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); + taosCloseFile(&pFile); + goto FAILED; + } + + if(taosLockFile(pFile) < 0){ + uError("failed to lock file:%p since %s", pFile, terrstr()); + goto FAILED; + } + + SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); + if(pInst == NULL){ + uError("failed to get app instance by clusterId:%" PRId64, slowLogData->clusterId); + goto FAILED; + } + + taosTmrStart(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, (void*)slowLogData->clusterId, monitorTimer); + }else{ + pFile = *(TdFilePtr*)tmp; + } + + if (taosWriteFile(pFile, slowLogData->value, strlen(slowLogData->value) + 1) < 0){ + uError("failed to write len to file:%p since %s", pFile, terrstr()); + } + uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId); + +FAILED: + taosRUnLockLatch(&monitorLock); +} + +void* monitorThreadFunc(void *param){ + setThreadName("client-monitor-slowlog"); + +#ifdef WINDOWS + if (taosCheckCurrentInDll()) { + atexit(monitorThreadFuncUnexpectedStopped); + } +#endif + + if (-1 != atomic_val_compare_exchange_32(&monitorStop, -1, 0)) { + return NULL; + } + + char tmpPath[PATH_MAX] = {0}; + if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0){ + return NULL; + } + + if (taosMulModeMkDir(tmpPath, 0777, true) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + printf("failed to create dir:%s since %s", tmpPath, terrstr()); + return NULL; + } + + if (tsem2_init(&monitorSem, 0, 0) != 0) { + uError("sem init error since %s", terrstr()); + return NULL; + } + + monitorQueue = taosOpenQueue(); + if(monitorQueue == NULL){ + uError("open queue error since %s", terrstr()); + return NULL; + } + while (1) { + if (monitorStop > 0) break; + + MonitorSlowLogData* slowLogData = NULL; + taosReadQitem(monitorQueue, (void**)&slowLogData); + if (slowLogData != NULL) { + uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); + monitorWriteSlowLog2File(slowLogData, tmpPath); + } + monitorFreeSlowLogData(slowLogData); + taosFreeQitem(slowLogData); + tsem2_wait(&monitorSem); + } + + taosCloseQueue(monitorQueue); + tsem2_destroy(&monitorSem); + monitorStop = -2; + return NULL; +} \ No newline at end of file diff --git a/source/client/src/slowQueryMonitor.c b/source/client/src/clientMonitorSlow.c similarity index 64% rename from source/client/src/slowQueryMonitor.c rename to source/client/src/clientMonitorSlow.c index b41343443d..192792f43e 100644 --- a/source/client/src/slowQueryMonitor.c +++ b/source/client/src/clientMonitorSlow.c @@ -21,7 +21,6 @@ const char* slowQueryName = "taos_slow_sql:count"; const char* slowQueryHelp = "slow query log when cost over than config duration"; const int slowQueryLabelCount = 4; const char* slowQueryLabels[] = {"cluster_id", "username", "result", "duration"}; -static const char* defaultClusterID = ""; const int64_t usInSeconds = 1000 * 1000; const int64_t msInMinutes = 60 * 1000; @@ -39,21 +38,21 @@ static const char* getSlowQueryLableCostDesc(int64_t cost) { return "0-3s"; } -void clientSlowQueryMonitorInit(const char* clusterKey) { - if (!tsEnableMonitor) return; - SAppInstInfo* pAppInstInfo = getAppInstInfo(clusterKey); - SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); - clusterMonitorInit(clusterKey, epSet, pAppInstInfo->pTransporter); - createClusterCounter(clusterKey, slowQueryName, slowQueryHelp, slowQueryLabelCount, slowQueryLabels); +void monitorClientSlowQueryInit(int64_t clusterid) { + monitorCreateClient(clusterid); + monitorCreateClientCounter(clusterid, slowQueryName, slowQueryHelp, slowQueryLabelCount, slowQueryLabels); } -void clientSlowQueryLog(const char* clusterKey, const char* user, SQL_RESULT_CODE result, int32_t cost) { - const char* slowQueryLabelValues[] = {defaultClusterID, user, resultStr(result), getSlowQueryLableCostDesc(cost)}; - taosClusterCounterInc(clusterKey, slowQueryName, slowQueryLabelValues); +void clientSlowQueryLog(int64_t clusterId, const char* user, SQL_RESULT_CODE result, int32_t cost) { + char clusterIdStr[32] = {0}; + if (snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId) < 0){ + uError("failed to generate clusterId:%" PRId64, clusterId); + } + const char* slowQueryLabelValues[] = {clusterIdStr, user, monitorResultStr(result), getSlowQueryLableCostDesc(cost)}; + monitorCounterInc(clusterId, slowQueryName, slowQueryLabelValues); } -void SlowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) { - if (!tsEnableMonitor) return; +void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) { SQL_RESULT_CODE result = SQL_RESULT_SUCCESS; if (TSDB_CODE_SUCCESS != code) { result = SQL_RESULT_FAILED; @@ -66,12 +65,12 @@ void SlowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) { STscObj* pTscObj = acquireTscObj(rid); if (pTscObj != NULL) { if(pTscObj->pAppInfo == NULL) { - tscLog("SlowQueryLog, not found pAppInfo"); + tscLog("slowQueryLog, not found pAppInfo"); } else { - clientSlowQueryLog(pTscObj->pAppInfo->instKey, pTscObj->user, result, cost); + clientSlowQueryLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, cost); } releaseTscObj(rid); } else { - tscLog("SlowQueryLog, not found rid"); + tscLog("slowQueryLog, not found rid"); } } diff --git a/source/client/src/clientSqlMonitor.c b/source/client/src/clientMonitorSql.c similarity index 64% rename from source/client/src/clientSqlMonitor.c rename to source/client/src/clientMonitorSql.c index 572af7ff55..19d5b7506e 100644 --- a/source/client/src/clientSqlMonitor.c +++ b/source/client/src/clientMonitorSql.c @@ -22,17 +22,12 @@ const char* selectMonitorHelp = "count for select sql"; const int selectMonitorLabelCount = 4; const char* selectMonitorLabels[] = {"cluster_id", "sql_type", "username", "result"}; -static const char* defaultClusterID = ""; - -void clientSQLReqMonitorInit(const char* clusterKey) { - if (!tsEnableMonitor) return; - SAppInstInfo* pAppInstInfo = getAppInstInfo(clusterKey); - SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); - clusterMonitorInit(clusterKey, epSet, pAppInstInfo->pTransporter); - createClusterCounter(clusterKey, selectMonitorName, selectMonitorHelp, selectMonitorLabelCount, selectMonitorLabels); +void monitorClientSQLReqInit(int64_t clusterId) { + monitorCreateClient(clusterId); + monitorCreateClientCounter(clusterId, selectMonitorName, selectMonitorHelp, selectMonitorLabelCount, selectMonitorLabels); } -void clientSQLReqLog(const char* clusterKey, const char* user, SQL_RESULT_CODE result, int8_t type) { +void clientSQLReqLog(int64_t clusterId, const char* user, SQL_RESULT_CODE result, int8_t type) { const char* typeStr; switch (type) { case MONITORSQLTYPEDELETE: @@ -45,12 +40,15 @@ void clientSQLReqLog(const char* clusterKey, const char* user, SQL_RESULT_CODE r typeStr = "select"; break; } - const char* selectMonitorLabelValues[] = {defaultClusterID, typeStr, user, resultStr(result)}; - taosClusterCounterInc(clusterKey, selectMonitorName, selectMonitorLabelValues); + char clusterIdStr[32] = {0}; + if (snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId) < 0){ + uError("failed to generate clusterId:%" PRId64, clusterId); + } + const char* selectMonitorLabelValues[] = {clusterIdStr, typeStr, user, monitorResultStr(result)}; + monitorCounterInc(clusterId, selectMonitorName, selectMonitorLabelValues); } void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type) { - if (!tsEnableMonitor) return; SQL_RESULT_CODE result = SQL_RESULT_SUCCESS; if (TSDB_CODE_SUCCESS != code) { result = SQL_RESULT_FAILED; @@ -65,15 +63,10 @@ void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type) { if (pTscObj->pAppInfo == NULL) { tscLog("sqlReqLog, not found pAppInfo"); } else { - clientSQLReqLog(pTscObj->pAppInfo->instKey, pTscObj->user, result, type); + clientSQLReqLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, type); } releaseTscObj(rid); } else { tscLog("sqlReqLog, not found rid"); } } - -void clientMonitorClose(const char* clusterKey) { - tscLog("clientMonitorClose, key:%s", clusterKey); - clusterMonitorClose(clusterKey); -} diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index f04e5e53c6..effdc693cf 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -15,6 +15,7 @@ #include "catalog.h" #include "clientInt.h" +#include "clientMonitor.h" #include "clientLog.h" #include "cmdnodes.h" #include "os.h" @@ -140,6 +141,8 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { // update the appInstInfo pTscObj->pAppInfo->clusterId = connectRsp.clusterId; + pTscObj->pAppInfo->monitorParas = connectRsp.monitorParas; + tscDebug("[monitor] paras from connect rsp, clusterId:%" PRIx64 " monitorParas threshold:%d", connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold); lastClusterId = connectRsp.clusterId; pTscObj->connType = connectRsp.connType; @@ -147,6 +150,15 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { pTscObj->authVer = connectRsp.authVer; pTscObj->whiteListInfo.ver = connectRsp.whiteListVer; + if(taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL){ + if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){ + tscError("failed to put appInfo into appInfo.pInstMapByClusterId"); + } + monitorSendAllSlowLogFromTempDir(pTscObj->pAppInfo); + monitorClientSlowQueryInit(connectRsp.clusterId); + monitorClientSQLReqInit(connectRsp.clusterId); + } + taosThreadMutexLock(&clientHbMgr.lock); SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx); if (pAppHbMgr) { @@ -233,7 +245,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { struct SCatalog* pCatalog = NULL; if (usedbRsp.vgVersion >= 0) { // cached in local - uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId; + int64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId; int32_t code1 = catalogGetHandle(clusterId, &pCatalog); if (code1 != TSDB_CODE_SUCCESS) { tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 6eb7abe0eb..a771fc1635 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1851,7 +1851,7 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL } void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) { - if (tsSlowLogScope & SLOW_LOG_TYPE_INSERT) { + if (request->pTscObj->pAppInfo->monitorParas.tsSlowLogScope & SLOW_LOG_TYPE_INSERT) { int32_t len = 0; int32_t rlen = 0; char *p = NULL; diff --git a/source/client/test/clientMonitorTests.cpp b/source/client/test/clientMonitorTests.cpp index f7ddc1a5cd..c74f4f7290 100644 --- a/source/client/test/clientMonitorTests.cpp +++ b/source/client/test/clientMonitorTests.cpp @@ -35,34 +35,34 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } -TEST(clientMonitorTest, monitorTest) { - const char* cluster1 = "cluster1"; - const char* cluster2 = "cluster2"; - SEpSet epSet; - clusterMonitorInit(cluster1, epSet, NULL); - const char* counterName1 = "slow_query"; - const char* counterName2 = "select_count"; - const char* help1 = "test for slowQuery"; - const char* help2 = "test for selectSQL"; - const char* lables[] = {"lable1"}; - taos_counter_t* c1 = createClusterCounter(cluster1, counterName1, help1, 1, lables); - ASSERT_TRUE(c1 != NULL); - taos_counter_t* c2 = createClusterCounter(cluster1, counterName2, help2, 1, lables); - ASSERT_TRUE(c2 != NULL); - ASSERT_TRUE(c1 != c2); - taos_counter_t* c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables); - ASSERT_TRUE(c21 == NULL); - clusterMonitorInit(cluster2, epSet, NULL); - c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables); - ASSERT_TRUE(c21 != NULL); - int i = 0; - while (i < 12) { - taosMsleep(10); - ++i; - } - clusterMonitorClose(cluster1); - clusterMonitorClose(cluster2); -} +//TEST(clientMonitorTest, monitorTest) { +// const char* cluster1 = "cluster1"; +// const char* cluster2 = "cluster2"; +// SEpSet epSet; +// clientMonitorInit(cluster1, epSet, NULL); +// const char* counterName1 = "slow_query"; +// const char* counterName2 = "select_count"; +// const char* help1 = "test for slowQuery"; +// const char* help2 = "test for selectSQL"; +// const char* lables[] = {"lable1"}; +// taos_counter_t* c1 = createClusterCounter(cluster1, counterName1, help1, 1, lables); +// ASSERT_TRUE(c1 != NULL); +// taos_counter_t* c2 = createClusterCounter(cluster1, counterName2, help2, 1, lables); +// ASSERT_TRUE(c2 != NULL); +// ASSERT_TRUE(c1 != c2); +// taos_counter_t* c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables); +// ASSERT_TRUE(c21 == NULL); +// clientMonitorInit(cluster2, epSet, NULL); +// c21 = createClusterCounter(cluster2, counterName1, help2, 1, lables); +// ASSERT_TRUE(c21 != NULL); +// int i = 0; +// while (i < 12) { +// taosMsleep(10); +// ++i; +// } +// clusterMonitorClose(cluster1); +// clusterMonitorClose(cluster2); +//} TEST(clientMonitorTest, sendTest) { TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); @@ -70,13 +70,100 @@ TEST(clientMonitorTest, sendTest) { printf("connect taosd sucessfully.\n"); int64_t rid = *(int64_t *)taos; - SlowQueryLog(rid, false, -1, 1000); + slowQueryLog(rid, false, -1, 1000); int i = 0; while (i < 20) { - SlowQueryLog(rid, false, 0, i * 1000); + slowQueryLog(rid, false, 0, i * 1000); taosMsleep(10); ++i; } taos_close(taos); } + +TEST(clientMonitorTest, ReadOneFile) { + // Create a TdFilePtr object and set it up for testing + + TdFilePtr pFile = taosOpenFile("./tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); + if (pFile == NULL) { + uError("failed to open file:./test.txt since %s", terrstr()); + return; + } + + const int batch = 10; + const int size = SLOW_LOG_SEND_SIZE/batch; + for(int i = 0; i < batch + 1; i++){ + char value[size] = {0}; + memset(value, '0' + i, size - 1); + if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ + uError("failed to write len to file:%p since %s", pFile, terrstr()); + } + } + + // Create a void pointer and set it up for testing + void* pTransporter = NULL; + + // Create an SEpSet object and set it up for testing + SEpSet* epSet = NULL; + + // Call the function to be tested + monitorReadSendSlowLog(pFile, pTransporter, epSet); + + char value[size] = {0}; + memset(value, '0', size - 1); + if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ + uError("failed to write len to file:%p since %s", pFile, terrstr()); + } + + monitorReadSendSlowLog(pFile, pTransporter, epSet); + + // Clean up any resources created for testing + taosCloseFile(&pFile); +} + +TEST(clientMonitorTest, ReadTwoFile) { + // Create a TdFilePtr object and set it up for testing + + TdFilePtr pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); + if (pFile == NULL) { + uError("failed to open file:./test.txt since %s", terrstr()); + return; + } + + const int batch = 10; + const int size = SLOW_LOG_SEND_SIZE/batch; + for(int i = 0; i < batch + 1; i++){ + char value[size] = {0}; + memset(value, '0' + i, size - 1); + if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ + uError("failed to write len to file:%p since %s", pFile, terrstr()); + } + } + + taosFsyncFile(pFile); + taosCloseFile(&pFile); + + pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-2-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); + if (pFile == NULL) { + uError("failed to open file:./test.txt since %s", terrstr()); + return; + } + + for(int i = 0; i < batch + 1; i++){ + char value[size] = {0}; + memset(value, '0' + i, size - 1); + if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ + uError("failed to write len to file:%p since %s", pFile, terrstr()); + } + } + + taosFsyncFile(pFile); + taosCloseFile(&pFile); + + SAppInstInfo pAppInfo = {0}; + pAppInfo.clusterId = 2; + pAppInfo.monitorParas.tsEnableMonitor = 1; + strcpy(tsTempDir,"/tmp"); + monitorSendAllSlowLogFromTempDir(&pAppInfo); + +} \ No newline at end of file diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index b5bad92dc4..1bc1fb2240 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1525,6 +1525,13 @@ TEST(clientCase, sub_tb_test) { } TEST(clientCase, sub_tb_mt_test) { + char *user = NULL; + char *auth = NULL; + char *ip = NULL; + int port = 0; + char key[512] = {0}; + snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port); + taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); TdThread qid[20] = {0}; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c68dc85c29..8b821f9ede 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -178,8 +178,10 @@ int32_t tsRedirectMaxPeriod = 1000; int32_t tsMaxRetryWaitTime = 10000; bool tsUseAdapter = false; int32_t tsMetaCacheMaxSize = -1; // MB -int32_t tsSlowLogThreshold = 3; // seconds -int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL; +int32_t tsSlowLogThreshold = 10; // seconds +int32_t tsSlowLogThresholdTest = 10; // seconds +int32_t tsSlowLogScope = SLOW_LOG_TYPE_QUERY; +int32_t tsSlowLogMaxLen = 4096; int32_t tsTimeSeriesThreshold = 50; bool tsMultiResultFunctionStarReturnTags = false; @@ -541,9 +543,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { return -1; if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; - if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) - return -1; - if (cfgAddString(pCfg, "slowLogScope", "", CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); @@ -568,9 +567,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { return -1; if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1; - if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; - if (cfgAddBool(pCfg, "multiResultFunctionStarReturnTags", tsMultiResultFunctionStarReturnTags, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; @@ -698,6 +694,14 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "skipGrant", tsMndSkipGrant, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 86400, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + + if (cfgAddInt32(pCfg, "slowLogThresholdTest", tsSlowLogThresholdTest, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 1, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "slowLogMaxLen", tsSlowLogMaxLen, 0, 16384, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "slowLogScope", "", CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -962,40 +966,52 @@ static void taosSetServerLogCfg(SConfig *pCfg) { sndDebugFlag = cfgGetItem(pCfg, "sndDebugFlag")->i32; } -static int32_t taosSetSlowLogScope(const char *pScope) { +static int32_t taosSetSlowLogScope(char *pScope) { if (NULL == pScope || 0 == strlen(pScope)) { - tsSlowLogScope = SLOW_LOG_TYPE_ALL; - return 0; + return SLOW_LOG_TYPE_QUERY; } - if (0 == strcasecmp(pScope, "all")) { - tsSlowLogScope = SLOW_LOG_TYPE_ALL; - return 0; + int32_t slowScope = 0; + + char* scope = NULL; + char *tmp = NULL; + while((scope = strsep(&pScope, "|")) != NULL){ + taosMemoryFreeClear(tmp); + tmp = strdup(scope); + strtrim(tmp); + if (0 == strcasecmp(tmp, "all")) { + slowScope |= SLOW_LOG_TYPE_ALL; + continue; + } + + if (0 == strcasecmp(tmp, "query")) { + slowScope |= SLOW_LOG_TYPE_QUERY; + continue; + } + + if (0 == strcasecmp(tmp, "insert")) { + slowScope |= SLOW_LOG_TYPE_INSERT; + continue; + } + + if (0 == strcasecmp(tmp, "others")) { + slowScope |= SLOW_LOG_TYPE_OTHERS; + continue; + } + + if (0 == strcasecmp(tmp, "none")) { + slowScope |= SLOW_LOG_TYPE_NULL; + continue; + } + + taosMemoryFreeClear(tmp); + uError("Invalid slowLog scope value:%s", pScope); + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; } - if (0 == strcasecmp(pScope, "query")) { - tsSlowLogScope = SLOW_LOG_TYPE_QUERY; - return 0; - } - - if (0 == strcasecmp(pScope, "insert")) { - tsSlowLogScope = SLOW_LOG_TYPE_INSERT; - return 0; - } - - if (0 == strcasecmp(pScope, "others")) { - tsSlowLogScope = SLOW_LOG_TYPE_OTHERS; - return 0; - } - - if (0 == strcasecmp(pScope, "none")) { - tsSlowLogScope = 0; - return 0; - } - - uError("Invalid slowLog scope value:%s", pScope); - terrno = TSDB_CODE_INVALID_CFG_VALUE; - return -1; + taosMemoryFreeClear(tmp); + return slowScope; } // for common configs @@ -1053,12 +1069,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64; tsMetaCacheMaxSize = cfgGetItem(pCfg, "metaCacheMaxSize")->i32; - tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32; - tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; - tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; - if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) { - return -1; - } tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; @@ -1131,6 +1141,15 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsSIMDEnable = (bool)cfgGetItem(pCfg, "simdEnable")->bval; tsTagFilterCache = (bool)cfgGetItem(pCfg, "tagFilterCache")->bval; + tsSlowLogThresholdTest = cfgGetItem(pCfg, "slowLogThresholdTest")->i32; + tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32; + tsSlowLogMaxLen = cfgGetItem(pCfg, "slowLogMaxLen")->i32; + int32_t scope = taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str); + if(scope < 0){ + return -1; + } + tsSlowLogScope = scope; + tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN); @@ -1492,6 +1511,17 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { return 0; } + if (strcasecmp("slowLogScope", name) == 0) { + int32_t scope = taosSetSlowLogScope(pItem->str); + if(scope < 0){ + cfgUnLock(pCfg); + return -1; + } + tsSlowLogScope = scope; + cfgUnLock(pCfg); + return 0; + } + { // 'bool/int32_t/int64_t/float/double' variables with general modification function static OptionNameAndVar debugOptions[] = { {"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag}, {"mDebugFlag", &mDebugFlag}, @@ -1509,6 +1539,9 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"enableWhiteList", &tsEnableWhiteList}, {"telemetryReporting", &tsEnableTelem}, {"monitor", &tsEnableMonitor}, + {"monitorInterval", &tsMonitorInterval}, + {"slowLogThreshold", &tsSlowLogThreshold}, + {"slowLogMaxLen", &tsSlowLogMaxLen}, {"mndSdbWriteDelta", &tsMndSdbWriteDelta}, {"minDiskFreeSize", &tsMinDiskFreeSize}, @@ -1651,10 +1684,6 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { tsLogSpace.reserved = (int64_t)(((double)pItem->fval) * 1024 * 1024 * 1024); uInfo("%s set to %" PRId64, name, tsLogSpace.reserved); matched = true; - } else if (strcasecmp("monitor", name) == 0) { - tsEnableMonitor = pItem->bval; - uInfo("%s set to %d", name, tsEnableMonitor); - matched = true; } break; } @@ -1698,13 +1727,6 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype, false); uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst); matched = true; - } else if (strcasecmp("slowLogScope", name) == 0) { - if (taosSetSlowLogScope(pItem->str)) { - cfgUnLock(pCfg); - return -1; - } - uInfo("%s set to %s", name, pItem->str); - matched = true; } break; } @@ -1765,7 +1787,6 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { {"queryUseNodeAllocator", &tsQueryUseNodeAllocator}, {"smlDot2Underline", &tsSmlDot2Underline}, {"shellActivityTimer", &tsShellActivityTimer}, - {"slowLogThreshold", &tsSlowLogThreshold}, {"useAdapter", &tsUseAdapter}, {"experimental", &tsExperimental}, {"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags}, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d465e24d5b..cd663d8ee1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -69,6 +69,24 @@ pReq->sql = NULL; \ } while (0) +static int32_t tSerializeSMonitorParas(SEncoder *encoder, const SMonitorParas* pMonitorParas) { + if (tEncodeI8(encoder, pMonitorParas->tsEnableMonitor) < 0) return -1; + if (tEncodeI32(encoder, pMonitorParas->tsMonitorInterval) < 0) return -1; + if (tEncodeI32(encoder, pMonitorParas->tsSlowLogScope) < 0) return -1; + if (tEncodeI32(encoder, pMonitorParas->tsSlowLogMaxLen) < 0) return -1; + if (tEncodeI32(encoder, pMonitorParas->tsSlowLogThreshold) < 0) return -1; + return 0; +} + +static int32_t tDeserializeSMonitorParas(SDecoder *decoder, SMonitorParas* pMonitorParas){ + if (tDecodeI8(decoder, (int8_t *)&pMonitorParas->tsEnableMonitor) < 0) return -1; + if (tDecodeI32(decoder, &pMonitorParas->tsMonitorInterval) < 0) return -1; + if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogScope) < 0) return -1; + if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogMaxLen) < 0) return -1; + if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogThreshold) < 0) return -1; + return 0; +} + static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq); static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq); static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoRsp *pRsp); @@ -513,6 +531,7 @@ int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBa SClientHbRsp *pRsp = taosArrayGet(pBatchRsp->rsps, i); if (tSerializeSClientHbRsp(&encoder, pRsp) < 0) return -1; } + if (tSerializeSMonitorParas(&encoder, &pBatchRsp->monitorParas) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -540,6 +559,10 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR taosArrayPush(pBatchRsp->rsps, &rsp); } + if (!tDecodeIsEnd(&decoder)) { + if (tDeserializeSMonitorParas(&decoder, &pBatchRsp->monitorParas) < 0) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -1303,6 +1326,9 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { } if (tEncodeI64(&encoder, pReq->ipWhiteVer) < 0) return -1; + + if (tSerializeSMonitorParas(&encoder, &pReq->clusterCfg.monitorParas) < 0) return -1; + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1421,6 +1447,10 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI64(&decoder, &pReq->ipWhiteVer) < 0) return -1; } + if (!tDecodeIsEnd(&decoder)) { + if (tDeserializeSMonitorParas(&decoder, &pReq->clusterCfg.monitorParas) < 0) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -1516,6 +1546,7 @@ int32_t tSerializeSStatisReq(void *buf, int32_t bufLen, SStatisReq *pReq) { if (tEncodeI32(&encoder, pReq->contLen) < 0) return -1; if (tEncodeCStr(&encoder, pReq->pCont) < 0) return -1; + if (tEncodeI8(&encoder, pReq->type) < 0) return -1; tEndEncode(&encoder); @@ -1536,7 +1567,9 @@ int32_t tDeserializeSStatisReq(void *buf, int32_t bufLen, SStatisReq *pReq) { if (pReq->pCont == NULL) return -1; if (tDecodeCStrTo(&decoder, pReq->pCont) < 0) return -1; } - + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, (int8_t*)&pReq->type) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -5139,6 +5172,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1; if (tEncodeI32(&encoder, pRsp->authVer) < 0) return -1; if (tEncodeI64(&encoder, pRsp->whiteListVer) < 0) return -1; + if (tSerializeSMonitorParas(&encoder, &pRsp->monitorParas) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -5180,6 +5214,9 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { } else { pRsp->whiteListVer = 0; } + if (!tDecodeIsEnd(&decoder)) { + if (tDeserializeSMonitorParas(&decoder, &pRsp->monitorParas) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index d30b26e564..af7f3c373f 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -726,4 +726,118 @@ TEST(AlreadyAddGroupIdTest, GroupIdAddedWithDifferentLength) { EXPECT_FALSE(result); } +#define SLOW_LOG_TYPE_NULL 0x0 +#define SLOW_LOG_TYPE_QUERY 0x1 +#define SLOW_LOG_TYPE_INSERT 0x2 +#define SLOW_LOG_TYPE_OTHERS 0x4 +#define SLOW_LOG_TYPE_ALL 0xFFFFFFFF + +static int32_t taosSetSlowLogScope(char *pScope) { + if (NULL == pScope || 0 == strlen(pScope)) { + return SLOW_LOG_TYPE_QUERY; + } + + int32_t slowScope = 0; + + char* scope = NULL; + char *tmp = NULL; + while((scope = strsep(&pScope, "|")) != NULL){ + taosMemoryFreeClear(tmp); + tmp = strdup(scope); + strtrim(tmp); + if (0 == strcasecmp(tmp, "all")) { + slowScope |= SLOW_LOG_TYPE_ALL; + continue; + } + + if (0 == strcasecmp(tmp, "query")) { + slowScope |= SLOW_LOG_TYPE_QUERY; + continue; + } + + if (0 == strcasecmp(tmp, "insert")) { + slowScope |= SLOW_LOG_TYPE_INSERT; + continue; + } + + if (0 == strcasecmp(tmp, "others")) { + slowScope |= SLOW_LOG_TYPE_OTHERS; + continue; + } + + if (0 == strcasecmp(tmp, "none")) { + slowScope |= SLOW_LOG_TYPE_NULL; + continue; + } + + taosMemoryFreeClear(tmp); + uError("Invalid slowLog scope value:%s", pScope); + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; + } + + taosMemoryFreeClear(tmp); + return slowScope; +} + +TEST(TaosSetSlowLogScopeTest, NullPointerInput) { + char *pScope = NULL; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_QUERY); +} + +TEST(TaosSetSlowLogScopeTest, EmptyStringInput) { + char pScope[1] = ""; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_QUERY); +} + +TEST(TaosSetSlowLogScopeTest, AllScopeInput) { + char pScope[] = "all"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_ALL); +} + +TEST(TaosSetSlowLogScopeTest, QueryScopeInput) { + char pScope[] = " query"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_QUERY); +} + +TEST(TaosSetSlowLogScopeTest, InsertScopeInput) { + char pScope[] = "insert"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_INSERT); +} + +TEST(TaosSetSlowLogScopeTest, OthersScopeInput) { + char pScope[] = "others"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_OTHERS); +} + +TEST(TaosSetSlowLogScopeTest, NoneScopeInput) { + char pScope[] = "none"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, SLOW_LOG_TYPE_NULL); +} + +TEST(TaosSetSlowLogScopeTest, InvalidScopeInput) { + char pScope[] = "invalid"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, -1); +} + +TEST(TaosSetSlowLogScopeTest, MixedScopesInput) { + char pScope[] = "query|insert|others|none"; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, (SLOW_LOG_TYPE_QUERY | SLOW_LOG_TYPE_INSERT | SLOW_LOG_TYPE_OTHERS)); +} + +TEST(TaosSetSlowLogScopeTest, MixedScopesInputWithSpaces) { + char pScope[] = "query | insert | others "; + int32_t result = taosSetSlowLogScope(pScope); + EXPECT_EQ(result, (SLOW_LOG_TYPE_QUERY | SLOW_LOG_TYPE_INSERT | SLOW_LOG_TYPE_OTHERS)); +} + #pragma GCC diagnostic pop diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 800f7f7864..cb0bc6102c 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -116,6 +116,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0; req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat; req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum; + req.clusterCfg.monitorParas.tsEnableMonitor = tsEnableMonitor; + req.clusterCfg.monitorParas.tsMonitorInterval = tsMonitorInterval; + req.clusterCfg.monitorParas.tsSlowLogScope = tsSlowLogScope; + req.clusterCfg.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen; + req.clusterCfg.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold; char timestr[32] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d02aec98ca..35900fc1dd 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -438,7 +438,20 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) { } } +#define CHECK_MONITOR_PARA(para) \ +if (pCfg->monitorParas.para != para) { \ + mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \ + terrno = TSDB_CODE_DNODE_INVALID_STATUS_INTERVAL; \ + return DND_REASON_STATUS_INTERVAL_NOT_MATCH;\ +} + static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) { + CHECK_MONITOR_PARA(tsEnableMonitor); + CHECK_MONITOR_PARA(tsMonitorInterval); + CHECK_MONITOR_PARA(tsSlowLogThreshold); + CHECK_MONITOR_PARA(tsSlowLogMaxLen); + CHECK_MONITOR_PARA(tsSlowLogScope); + if (pCfg->statusInterval != tsStatusInterval) { mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval, tsStatusInterval); @@ -530,6 +543,8 @@ static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) { return stateChanged; } +extern char* tsMonFwUri; +extern char* tsMonSlowLogUri; static int32_t mndProcessStatisReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStatisReq statisReq = {0}; @@ -547,188 +562,14 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { mInfo("process statis req,\n %s", statisReq.pCont); } - SJson *pJson = tjsonParse(statisReq.pCont); - - int32_t ts_size = tjsonGetArraySize(pJson); - - for (int32_t i = 0; i < ts_size; i++) { - SJson *item = tjsonGetArrayItem(pJson, i); - - SJson *tables = tjsonGetObjectItem(item, "tables"); - - int32_t tableSize = tjsonGetArraySize(tables); - for (int32_t i = 0; i < tableSize; i++) { - SJson *table = tjsonGetArrayItem(tables, i); - - char tableName[MONITOR_TABLENAME_LEN] = {0}; - tjsonGetStringValue(table, "name", tableName); - - SJson *metricGroups = tjsonGetObjectItem(table, "metric_groups"); - - int32_t size = tjsonGetArraySize(metricGroups); - for (int32_t i = 0; i < size; i++) { - SJson *item = tjsonGetArrayItem(metricGroups, i); - - SJson *arrayTag = tjsonGetObjectItem(item, "tags"); - - int32_t tagSize = tjsonGetArraySize(arrayTag); - for (int32_t j = 0; j < tagSize; j++) { - SJson *item = tjsonGetArrayItem(arrayTag, j); - - char tagName[MONITOR_TAG_NAME_LEN] = {0}; - tjsonGetStringValue(item, "name", tagName); - - if (strncmp(tagName, "cluster_id", MONITOR_TAG_NAME_LEN) == 0) { - tjsonDeleteItemFromObject(item, "value"); - tjsonAddStringToObject(item, "value", strClusterId); - } - } - } - } - } - - char *pCont = tjsonToString(pJson); - monSendContent(pCont); - - if (pJson != NULL) { - tjsonDelete(pJson); - pJson = NULL; - } - - if (pCont != NULL) { - taosMemoryFree(pCont); - pCont = NULL; + if (statisReq.type == MONITOR_TYPE_COUNTER){ + monSendContent(statisReq.pCont, tsMonFwUri); + }else if(statisReq.type == MONITOR_TYPE_SLOW_LOG){ + monSendContent(statisReq.pCont, tsMonSlowLogUri); } tFreeSStatisReq(&statisReq); return 0; - - /* - SJson* pJson = tjsonParse(statisReq.pCont); - - int32_t ts_size = tjsonGetArraySize(pJson); - - for(int32_t i = 0; i < ts_size; i++){ - SJson* item = tjsonGetArrayItem(pJson, i); - - SJson* tables = tjsonGetObjectItem(item, "tables"); - - int32_t tableSize = tjsonGetArraySize(tables); - for(int32_t i = 0; i < tableSize; i++){ - SJson* table = tjsonGetArrayItem(tables, i); - - char tableName[MONITOR_TABLENAME_LEN] = {0}; - tjsonGetStringValue(table, "name", tableName); - - SJson* metricGroups = tjsonGetObjectItem(table, "metric_groups"); - - int32_t size = tjsonGetArraySize(metricGroups); - for(int32_t i = 0; i < size; i++){ - SJson* item = tjsonGetArrayItem(metricGroups, i); - - SJson* arrayTag = tjsonGetObjectItem(item, "tags"); - - int32_t tagSize = tjsonGetArraySize(arrayTag); - - char** labels = taosMemoryMalloc(sizeof(char*) * tagSize); - char** sample_labels = taosMemoryMalloc(sizeof(char*) * tagSize); - - for(int32_t j = 0; j < tagSize; j++){ - SJson* item = tjsonGetArrayItem(arrayTag, j); - - *(labels + j) = taosMemoryMalloc(MONITOR_TAG_NAME_LEN); - tjsonGetStringValue(item, "name", *(labels + j)); - - *(sample_labels + j) = taosMemoryMalloc(MONITOR_TAG_VALUE_LEN); - tjsonGetStringValue(item, "value", *(sample_labels + j)); - if(strncmp(*(labels + j), "cluster_id", MONITOR_TAG_NAME_LEN) == 0) { - strncpy(*(sample_labels + j), strClusterId, MONITOR_TAG_VALUE_LEN); - } - } - - SJson* metrics = tjsonGetObjectItem(item, "metrics"); - - int32_t metricLen = tjsonGetArraySize(metrics); - for(int32_t j = 0; j < metricLen; j++){ - SJson *item = tjsonGetArrayItem(metrics, j); - - char name[MONITOR_METRIC_NAME_LEN] = {0}; - tjsonGetStringValue(item, "name", name); - - double value = 0; - tjsonGetDoubleValue(item, "value", &value); - - double type = 0; - tjsonGetDoubleValue(item, "type", &type); - - int32_t metricNameLen = strlen(name) + strlen(tableName) + 2; - char* metricName = taosMemoryMalloc(metricNameLen); - memset(metricName, 0, metricNameLen); - sprintf(metricName, "%s:%s", tableName, name); - - taos_metric_t* metric = taos_collector_registry_get_metric(metricName); - if(metric == NULL){ - if(type == 0){ - metric = taos_counter_new(metricName, "", tagSize, (const char**)labels); - } - if(type == 1){ - metric = taos_gauge_new(metricName, "", tagSize, (const char**)labels); - } - mTrace("fail to get metric from registry, new one metric:%p", metric); - - if(taos_collector_registry_register_metric(metric) == 1){ - if(type == 0){ - taos_counter_destroy(metric); - } - if(type == 1){ - taos_gauge_destroy(metric); - } - - metric = taos_collector_registry_get_metric(metricName); - - mTrace("fail to register metric, get metric from registry:%p", metric); - } - else{ - mTrace("succeed to register metric:%p", metric); - } - } - else{ - mTrace("get metric from registry:%p", metric); - } - - if(type == 0){ - taos_counter_add(metric, value, (const char**)sample_labels); - } - if(type == 1){ - taos_gauge_set(metric, value, (const char**)sample_labels); - } - - taosMemoryFreeClear(metricName); - } - - for(int32_t j = 0; j < tagSize; j++){ - taosMemoryFreeClear(*(labels + j)); - taosMemoryFreeClear(*(sample_labels + j)); - } - - taosMemoryFreeClear(sample_labels); - taosMemoryFreeClear(labels); - } - } - - } - - code = 0; - - _OVER: - if(pJson != NULL){ - tjsonDelete(pJson); - pJson = NULL; - } - - tFreeSStatisReq(&statisReq); - return code; - */ } static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) { diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index fafdb539fb..2b83419dce 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -281,7 +281,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { } } -_CONNECT: pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp, pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime); if (pConn == NULL) { @@ -301,6 +300,11 @@ _CONNECT: connectRsp.passVer = pUser->passVersion; connectRsp.authVer = pUser->authVersion; connectRsp.whiteListVer = pUser->ipWhiteListVer; + connectRsp.monitorParas.tsEnableMonitor = tsEnableMonitor; + connectRsp.monitorParas.tsMonitorInterval = tsMonitorInterval; + connectRsp.monitorParas.tsSlowLogScope = tsSlowLogScope; + connectRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen; + connectRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold; strcpy(connectRsp.sVer, version); snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, @@ -657,6 +661,11 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) { SClientHbBatchRsp batchRsp = {0}; batchRsp.svrTimestamp = taosGetTimestampSec(); batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); + batchRsp.monitorParas.tsEnableMonitor = tsEnableMonitor; + batchRsp.monitorParas.tsMonitorInterval = tsMonitorInterval; + batchRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold; + batchRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen; + batchRsp.monitorParas.tsSlowLogScope = tsSlowLogScope; int32_t sz = taosArrayGetSize(batchReq.reqs); for (int i = 0; i < sz; i++) { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index c5c14950b2..6aaa79bb31 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -363,7 +363,7 @@ typedef struct SCtgUserAuth { } SCtgUserAuth; typedef struct SCatalog { - uint64_t clusterId; + int64_t clusterId; bool stopUpdate; SDynViewVersion dynViewVer; SHashObj* userCache; // key:user, value:SCtgUserAuth diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index e7d5a89d6f..4048c8841b 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -834,7 +834,7 @@ int32_t catalogInit(SCatalogCfg* cfg) { return TSDB_CODE_SUCCESS; } -int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { +int32_t catalogGetHandle(int64_t clusterId, SCatalog** catalogHandle) { if (NULL == catalogHandle) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 6da1da2b52..eac716339b 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -410,7 +410,7 @@ void ctgFreeHandle(SCatalog* pCtg) { return; } - uint64_t clusterId = pCtg->clusterId; + int64_t clusterId = pCtg->clusterId; ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->stbRent); @@ -498,7 +498,7 @@ void ctgClearHandle(SCatalog* pCtg) { return; } - uint64_t clusterId = pCtg->clusterId; + int64_t clusterId = pCtg->clusterId; ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->stbRent); diff --git a/source/libs/monitor/src/clientMonitor.c b/source/libs/monitor/src/clientMonitor.c deleted file mode 100644 index 0891932583..0000000000 --- a/source/libs/monitor/src/clientMonitor.c +++ /dev/null @@ -1,210 +0,0 @@ -#include "clientMonitor.h" -#include "os.h" -#include "tmisce.h" -#include "ttime.h" -#include "ttimer.h" -#include "tglobal.h" - -SRWLatch monitorLock; -void* tmrClientMonitor; -tmr_h tmrStartHandle; -SHashObj* clusterMonitorInfoTable; - -static int interval = 30 * 1000; -static int sendBathchSize = 1; - -int32_t sendReport(ClientMonitor* pMonitor, char* pCont); -void generateClusterReport(ClientMonitor* pMonitor, bool send) { - char ts[50]; - sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); - char* pCont = (char*)taos_collector_registry_bridge_new(pMonitor->registry, ts, "%" PRId64, NULL); - if(NULL == pCont) { - uError("generateClusterReport failed, get null content."); - return; - } - if (send && strlen(pCont) != 0) { - if (sendReport(pMonitor, pCont) == 0) { - taos_collector_registry_clear_batch(pMonitor->registry); - } - } - taosMemoryFreeClear(pCont); -} - -void reportSendProcess(void* param, void* tmrId) { - taosTmrReset(reportSendProcess, tsMonitorInterval * 1000, NULL, tmrClientMonitor, &tmrStartHandle); - taosRLockLatch(&monitorLock); - - static int index = 0; - index++; - ClientMonitor** ppMonitor = (ClientMonitor**)taosHashIterate(clusterMonitorInfoTable, NULL); - while (ppMonitor != NULL && *ppMonitor != NULL) { - ClientMonitor* pMonitor = *ppMonitor; - generateClusterReport(*ppMonitor, index == sendBathchSize); - ppMonitor = taosHashIterate(clusterMonitorInfoTable, ppMonitor); - } - - if (index == sendBathchSize) index = 0; - taosRUnLockLatch(&monitorLock); -} - -void monitorClientInitOnce() { - static int8_t init = 0; - if (atomic_exchange_8(&init, 1) == 0) { - uInfo("tscMonitorInit once."); - clusterMonitorInfoTable = - (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - - tmrClientMonitor = taosTmrInit(0, 0, 0, "MONITOR"); - tmrStartHandle = taosTmrStart(reportSendProcess, tsMonitorInterval * 1000, NULL, tmrClientMonitor); - if(tsMonitorInterval < 1){ - interval = 30 * 1000; - } else { - interval = tsMonitorInterval * 1000; - } - if (tsMonitorInterval < 10) { - sendBathchSize = (10 / sendBathchSize) + 1; - } - taosInitRWLatch(&monitorLock); - } -} - -void createMonitorClient(const char* clusterKey, SEpSet epSet, void* pTransporter) { - if (clusterKey == NULL || strlen(clusterKey) == 0) { - uError("createMonitorClient failed, clusterKey is NULL"); - return; - } - taosWLockLatch(&monitorLock); - if (taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)) == NULL) { - uInfo("createMonitorClient for %s.", clusterKey); - ClientMonitor* pMonitor = taosMemoryCalloc(1, sizeof(ClientMonitor)); - snprintf(pMonitor->clusterKey, sizeof(pMonitor->clusterKey), "%s", clusterKey); - pMonitor->registry = taos_collector_registry_new(clusterKey); - pMonitor->colector = taos_collector_new(clusterKey); - epsetAssign(&pMonitor->epSet, &epSet); - pMonitor->pTransporter = pTransporter; - - taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector); - pMonitor->counters = - (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - - taosHashPut(clusterMonitorInfoTable, clusterKey, strlen(clusterKey), &pMonitor, sizeof(ClientMonitor*)); - uInfo("createMonitorClient for %s finished %p.", clusterKey, pMonitor); - } - taosWUnLockLatch(&monitorLock); -} - -static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { - static int32_t emptyRspNum = 0; - if (TSDB_CODE_SUCCESS != code) { - uError("found error in monitorReport send callback, code:%d, please check the network.", code); - } - if (pMsg) { - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - } - return code; -} - -int32_t sendReport(ClientMonitor* pMonitor, char* pCont) { - SStatisReq sStatisReq; - sStatisReq.pCont = pCont; - sStatisReq.contLen = strlen(pCont); - - int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq); - if (tlen < 0) return 0; - void* buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - tSerializeSStatisReq(buf, tlen, &sStatisReq); - - SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (pInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pInfo->fp = monitorReportAsyncCB; - pInfo->msgInfo.pData = buf; - pInfo->msgInfo.len = tlen; - pInfo->msgType = TDMT_MND_STATIS; - // pInfo->param = taosMemoryMalloc(sizeof(int32_t)); - // *(int32_t*)pInfo->param = i; - pInfo->paramFreeFp = taosMemoryFree; - pInfo->requestId = tGenIdPI64(); - pInfo->requestObjRefId = 0; - - int64_t transporterId = 0; - return asyncSendMsgToServer(pMonitor->pTransporter, &pMonitor->epSet, &transporterId, pInfo); -} - -void clusterMonitorInit(const char* clusterKey, SEpSet epSet, void* pTransporter) { - monitorClientInitOnce(); - createMonitorClient(clusterKey, epSet, pTransporter); -} - -taos_counter_t* createClusterCounter(const char* clusterKey, const char* name, const char* help, size_t label_key_count, - const char** label_keys) { - ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); - - if (ppMonitor != NULL && *ppMonitor != NULL) { - ClientMonitor* pMonitor = *ppMonitor; - taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, name, strlen(name)); - if (ppCounter != NULL && *ppCounter != NULL) { - taosHashRemove(pMonitor->counters, name, strlen(name)); - uInfo("createClusterCounter remove old counter: %s.", name); - } - - taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); - if (newCounter != NULL) { - taos_collector_add_metric(pMonitor->colector, newCounter); - taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, sizeof(taos_counter_t*)); - uInfo("createClusterCounter %s(%p):%s : %p.", pMonitor->clusterKey, pMonitor, name, newCounter); - return newCounter; - } else { - return NULL; - } - } else { - return NULL; - } - return NULL; -} - -int taosClusterCounterInc(const char* clusterKey, const char* counterName, const char** label_values) { - taosRLockLatch(&monitorLock); - ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); - - if (ppMonitor != NULL && *ppMonitor != NULL) { - ClientMonitor* pMonitor = *ppMonitor; - taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName)); - if (ppCounter != NULL && *ppCounter != NULL) { - taos_counter_inc(*ppCounter, label_values); - } else { - uError("taosClusterCounterInc not found pCounter %s:%s.", clusterKey, counterName); - } - } else { - uError("taosClusterCounterInc not found pMonitor %s.", clusterKey); - } - taosRUnLockLatch(&monitorLock); - return 0; -} - -void clusterMonitorClose(const char* clusterKey) { - taosWLockLatch(&monitorLock); - ClientMonitor** ppMonitor = (ClientMonitor**)taosHashGet(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); - - if (ppMonitor != NULL && *ppMonitor != NULL) { - ClientMonitor* pMonitor = *ppMonitor; - uInfo("clusterMonitorClose valule:%p clusterKey:%s.", pMonitor, pMonitor->clusterKey); - taosHashCleanup(pMonitor->counters); - taos_collector_registry_destroy(pMonitor->registry); - taosMemoryFree(pMonitor); - taosHashRemove(clusterMonitorInfoTable, clusterKey, strlen(clusterKey)); - } - taosWUnLockLatch(&monitorLock); -} - -const char* resultStr(SQL_RESULT_CODE code) { - static const char* result_state[] = {"Success", "Failed", "Cancel"}; - return result_state[code]; -} diff --git a/source/libs/monitor/src/monMain.c b/source/libs/monitor/src/monMain.c index 4ecc6f1261..21c196872c 100644 --- a/source/libs/monitor/src/monMain.c +++ b/source/libs/monitor/src/monMain.c @@ -24,6 +24,7 @@ SMonitor tsMonitor = {0}; char* tsMonUri = "/report"; char* tsMonFwUri = "/general-metric"; +char* tsMonSlowLogUri = "/slow-sql-detail-batch"; char* tsMonFwBasicUri = "/taosd-cluster-basic"; void monRecordLog(int64_t ts, ELogLevel level, const char *content) { @@ -631,7 +632,7 @@ void monGenAndSendReportBasic() { monCleanupMonitorInfo(pMonitor); } -void monSendContent(char *pCont) { +void monSendContent(char *pCont, const char* uri) { if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; if(tsMonitorLogProtocol){ if (pCont != NULL){ @@ -640,7 +641,7 @@ void monSendContent(char *pCont) { } if (pCont != NULL) { EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; - if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { + if (taosSendHttpReport(tsMonitor.cfg.server, uri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { uError("failed to send monitor msg"); } } diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index 54107db325..3ebf20e94e 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -77,7 +77,9 @@ void osDefaultInit() { } strcpy(tsDataDir, TD_DATA_DIR_PATH); strcpy(tsLogDir, TD_LOG_DIR_PATH); - strcpy(tsTempDir, TD_TMP_DIR_PATH); + if(strlen(tsTempDir) == 0){ + strcpy(tsTempDir, TD_TMP_DIR_PATH); + } } void osUpdate() { diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index bdd43fe9fa..ac6cf7bad2 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -66,7 +66,7 @@ typedef struct TdFile { void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath) { #ifdef WINDOWS - const char *tdengineTmpFileNamePrefix = "tdengine-"; + char tmpPath[PATH_MAX]; int32_t len = (int32_t)strlen(inputTmpDir); @@ -76,7 +76,7 @@ void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, cha tmpPath[len++] = '\\'; } - strcpy(tmpPath + len, tdengineTmpFileNamePrefix); + strcpy(tmpPath + len, TD_TMP_FILE_PREFIX); if (strlen(tmpPath) + strlen(fileNamePrefix) + strlen("-%d-%s") < PATH_MAX) { strcat(tmpPath, fileNamePrefix); strcat(tmpPath, "-%d-%s"); @@ -88,8 +88,6 @@ void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, cha #else - const char *tdengineTmpFileNamePrefix = "tdengine-"; - char tmpPath[PATH_MAX]; int32_t len = strlen(inputTmpDir); memcpy(tmpPath, inputTmpDir, len); @@ -99,7 +97,7 @@ void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, cha tmpPath[len++] = '/'; } - strcpy(tmpPath + len, tdengineTmpFileNamePrefix); + strcpy(tmpPath + len, TD_TMP_FILE_PREFIX); if (strlen(tmpPath) + strlen(fileNamePrefix) + strlen("-%d-%s") < PATH_MAX) { strcat(tmpPath, fileNamePrefix); strcat(tmpPath, "-%d-%s");