diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 68b55e71a9..4c7ab6f65a 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -25,13 +25,20 @@ extern "C" { #include "query.h" #include "tqueue.h" -typedef enum SQL_RESULT_CODE { +typedef enum { SQL_RESULT_SUCCESS = 0, SQL_RESULT_FAILED = 1, SQL_RESULT_CANCEL = 2, } SQL_RESULT_CODE; -#define SLOW_LOG_SEND_SIZE 32*1024 +typedef enum { + SLOW_LOG_WRITE = 0, + SLOW_LOG_READ_RUNNING = 1, + SLOW_LOG_READ_BEGINNIG = 2, + SLOW_LOG_READ_QUIT = 3, +} SLOW_LOG_QUEUE_TYPE; + +#define SLOW_LOG_SEND_SIZE_MAX 1024*1024 typedef struct { int64_t clusterId; @@ -43,12 +50,18 @@ typedef struct { typedef struct { TdFilePtr pFile; - void* timer; + int64_t lastCheckTime; + char path[PATH_MAX]; + int64_t offset; } SlowLogClient; typedef struct { - int64_t clusterId; - char *value; + int64_t clusterId; + SLOW_LOG_QUEUE_TYPE type; + char* data; + int64_t offset; + TdFilePtr pFile; + char* fileName; } MonitorSlowLogData; void monitorClose(); @@ -60,7 +73,7 @@ void monitorCreateClient(int64_t clusterId); void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys); void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values); const char* monitorResultStr(SQL_RESULT_CODE code); -int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value); +int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data); #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index af396876bb..3a821768f8 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -157,7 +157,11 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_ } char* value = cJSON_PrintUnformatted(json); - if(monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, value) < 0){ + MonitorSlowLogData data = {0}; + data.clusterId = pTscObj->pAppInfo->clusterId; + data.type = SLOW_LOG_WRITE; + data.data = value; + if(monitorPutData2MonitorQueue(data) < 0){ taosMemoryFree(value); } @@ -202,24 +206,24 @@ static void deregisterRequest(SRequestObj *pRequest) { pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) { - if (pRequest->pQuery && pRequest->pQuery->pRoot) { - if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type && - (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) { - tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 - "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", - duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, - pRequest->metric.planCostUs, pRequest->metric.execCostUs); - atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); - reqType = SLOW_LOG_TYPE_INSERT; - } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { - tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 - "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", - duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, - pRequest->metric.planCostUs, pRequest->metric.execCostUs); + if ((pRequest->pQuery && pRequest->pQuery->pRoot && + QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type && + (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) || + QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) { + tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 + "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", + duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, + pRequest->metric.planCostUs, pRequest->metric.execCostUs); + atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); + reqType = SLOW_LOG_TYPE_INSERT; + } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { + tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 + "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", + duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, + pRequest->metric.planCostUs, pRequest->metric.execCostUs); - atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); - reqType = SLOW_LOG_TYPE_QUERY; - } + atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); + reqType = SLOW_LOG_TYPE_QUERY; } nodesSimReleaseAllocator(pRequest->allocatorRefId); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 83c0783458..1b7b9263a5 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -19,6 +19,7 @@ #include "scheduler.h" #include "trpc.h" #include "tglobal.h" +#include "clientMonitor.h" typedef struct { union { @@ -546,10 +547,6 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { } SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo; - pInst->monitorParas = pRsp.monitorParas; - tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", - pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); - if (code != 0) { pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1; tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes); @@ -560,6 +557,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { return -1; } + pInst->monitorParas = pRsp.monitorParas; + tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", + pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); + if (rspNum) { tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 9e990dd545..479ea76fe3 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -1,4 +1,5 @@ #include "clientMonitor.h" +#include "clientLog.h" #include "os.h" #include "tmisce.h" #include "ttime.h" @@ -13,6 +14,7 @@ void* monitorTimer; SHashObj* monitorCounterHash; int32_t slowLogFlag = -1; int32_t monitorFlag = -1; +int32_t quitCnt = 0; tsem2_t monitorSem; STaosQueue* monitorQueue; SHashObj* monitorSlowLogHash; @@ -23,41 +25,40 @@ static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){ } int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); if (ret < 0){ - uError("failed to get tmp path ret:%d", ret); + tscError("failed to get tmp path ret:%d", ret); return ret; } return 0; } -//static void destroyCounter(void* data){ -// if (data == NULL) { -// return; -// } -// taos_counter_t* conuter = *(taos_counter_t**)data; -// if(conuter == NULL){ -// return; -// } -// taos_counter_destroy(conuter); -//} +static void processFileInTheEnd(TdFilePtr pFile, char* path){ + if(pFile == NULL){ + return; + } + if(taosFtruncateFile(pFile, 0) != 0){ + tscError("failed to truncate file:%s, errno:%d", path, errno); + return; + } + if(taosUnLockFile(pFile) != 0){ + tscError("failed to unlock file:%s, errno:%d", path, errno); + return; + } + if(taosCloseFile(&(pFile)) != 0){ + tscError("failed to close file:%s, errno:%d", path, errno); + return; + } + if(taosRemoveFile(path) != 0){ + tscError("failed to remove file:%s, errno:%d", path, errno); + return; + } +} static void destroySlowLogClient(void* data){ if (data == NULL) { return; } SlowLogClient* slowLogClient = *(SlowLogClient**)data; - if(slowLogClient == NULL){ - return; - } - taosTmrStopA(&(*(SlowLogClient**)data)->timer); - - TdFilePtr pFile = slowLogClient->pFile; - if(pFile == NULL){ - taosMemoryFree(slowLogClient); - return; - } - - taosUnLockFile(pFile); - taosCloseFile(&pFile); + processFileInTheEnd(slowLogClient->pFile, slowLogClient->path); taosMemoryFree(slowLogClient); } @@ -76,10 +77,26 @@ static void destroyMonitorClient(void* data){ taosMemoryFree(pMonitor); } +static void monitorFreeSlowLogData(void *paras) { + MonitorSlowLogData* pData = (MonitorSlowLogData*)paras; + if (pData == NULL) { + return; + } + taosMemoryFreeClear(pData->data); + if (pData->type == SLOW_LOG_READ_BEGINNIG){ + taosMemoryFree(pData->fileName); + } +} + +static void monitorFreeSlowLogDataEx(void *paras) { + monitorFreeSlowLogData(paras); + taosMemoryFree(paras); +} + static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) { void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES); if(p == NULL){ - uError("failed to get app inst, clusterId:%" PRIx64, clusterId); + tscError("failed to get app inst, clusterId:%" PRIx64, clusterId); return NULL; } return *(SAppInstInfo**)p; @@ -87,106 +104,66 @@ static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) { static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { if (TSDB_CODE_SUCCESS != code) { - uError("found error in monitorReport send callback, code:%d, please check the network.", code); + tscError("found error in monitorReport send callback, code:%d, please check the network.", code); } if (pMsg) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } + if(param != NULL){ + MonitorSlowLogData* p = (MonitorSlowLogData*)param; + if(code != 0){ + tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); + } + MonitorSlowLogData tmp = {.clusterId = p->clusterId, .type = p->type, .fileName = p->fileName, + .pFile= p->pFile, .offset = p->offset, .data = NULL}; + if(monitorPutData2MonitorQueue(tmp) == 0){ + p->fileName = NULL; + } + } return code; } -static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITOR_TYPE type) { +static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITOR_TYPE type, void* param) { SStatisReq sStatisReq; sStatisReq.pCont = pCont; sStatisReq.contLen = strlen(pCont); sStatisReq.type = type; int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq); - if (tlen < 0) return 0; + if (tlen < 0) { + goto FAILED; + } void* buf = taosMemoryMalloc(tlen); if (buf == NULL) { - uError("sendReport failed, out of memory, len:%d", tlen); + tscError("sendReport failed, out of memory, len:%d", tlen); terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + goto FAILED; } tSerializeSStatisReq(buf, tlen, &sStatisReq); SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (pInfo == NULL) { - uError("sendReport failed, out of memory send info"); + tscError("sendReport failed, out of memory send info"); terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + taosMemoryFree(buf); + goto FAILED; } pInfo->fp = monitorReportAsyncCB; pInfo->msgInfo.pData = buf; pInfo->msgInfo.len = tlen; pInfo->msgType = TDMT_MND_STATIS; - // pInfo->param = taosMemoryMalloc(sizeof(int32_t)); - // *(int32_t*)pInfo->param = i; - pInfo->paramFreeFp = taosMemoryFree; + pInfo->param = param; + pInfo->paramFreeFp = monitorFreeSlowLogDataEx; pInfo->requestId = tGenIdPI64(); pInfo->requestObjRefId = 0; int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); - if (code != TSDB_CODE_SUCCESS) { - uError("sendReport failed, code:%d", code); - } - return code; -} + return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); -static void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){ - char buf[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log - char pCont[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log - int32_t offset = 0; - if(taosLSeekFile(pFile, 0, SEEK_SET) < 0){ - uError("failed to seek file:%p code: %d", pFile, errno); - return; - } - while(1){ - int64_t readSize = taosReadFile(pFile, buf + offset, SLOW_LOG_SEND_SIZE - offset); - if (readSize <= 0) { - uError("failed to read len from file:%p since %s", pFile, terrstr()); - return; - } - - memset(pCont, 0, sizeof(pCont)); - strcat(pCont, "["); - char* string = buf; - for(int i = 0; i < readSize + offset; i++){ - if (buf[i] == '\0') { - if (string != buf) strcat(pCont, ","); - strcat(pCont, string); - uDebug("[monitor] monitorReadSendSlowLog slow log:%s", string); - string = buf + i + 1; - } - } - strcat(pCont, "]"); - if (pTransporter && pCont != NULL) { - if(sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_SLOW_LOG) != 0){ - if(taosLSeekFile(pFile, -readSize, SEEK_CUR) < 0){ - uError("failed to seek file:%p code: %d", pFile, errno); - } - uError("failed to send report:%s", pCont); - return; - } - uDebug("[monitor] monitorReadSendSlowLog send slow log to mnode:%s", pCont) - } - - if (readSize + offset < SLOW_LOG_SEND_SIZE) { - break; - } - offset = SLOW_LOG_SEND_SIZE - (string - buf); - if(buf != string && offset != 0){ - memmove(buf, string, offset); - uDebug("[monitor] monitorReadSendSlowLog left slow log:%s", buf) - } - } - if(taosFtruncateFile(pFile, 0) < 0){ - uError("failed to truncate file:%p code: %d", pFile, errno); - } - uDebug("[monitor] monitorReadSendSlowLog send slow log file:%p", pFile); +FAILED: + monitorFreeSlowLogDataEx(param); + return -1; } static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet *epSet) { @@ -194,11 +171,11 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL); if(NULL == pCont) { - uError("generateClusterReport failed, get null content."); + tscError("generateClusterReport failed, get null content."); return; } - if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER) == 0) { + if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) { taos_collector_registry_clear_batch(registry); } taosMemoryFreeClear(pCont); @@ -220,109 +197,24 @@ static void reportSendProcess(void* param, void* tmrId) { SEpSet ep = getEpSet_s(&pInst->mgmtEp); generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); - taosRUnLockLatch(&monitorLock); taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); -} - -static void sendAllSlowLog(){ - void* data = taosHashIterate(monitorSlowLogHash, NULL); - while (data != NULL) { - TdFilePtr pFile = (*(SlowLogClient**)data)->pFile; - if (pFile != NULL){ - int64_t clusterId = *(int64_t*)taosHashGetKey(data, NULL); - SAppInstInfo* pInst = getAppInstByClusterId(clusterId); - if(pInst == NULL){ - taosHashCancelIterate(monitorSlowLogHash, data); - break; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep); - } - data = taosHashIterate(monitorSlowLogHash, data); - } - uDebug("[monitor] sendAllSlowLog when client close"); -} - -static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ - SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId); - - if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){ - uInfo("[monitor] monitor is disabled, skip send slow log"); - return; - } - char namePrefix[PATH_MAX] = {0}; - if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) { - uError("failed to generate slow log file name prefix"); - return; - } - - taosRLockLatch(&monitorLock); - - char tmpPath[PATH_MAX] = {0}; - if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) { - goto END; - } - - TdDirPtr pDir = taosOpenDir(tmpPath); - if (pDir == NULL) { - goto END; - } - - TdDirEntryPtr de = NULL; - while ((de = taosReadDir(pDir)) != NULL) { - if (taosDirEntryIsDir(de)) { - continue; - } - - char *name = taosGetDirEntryName(de); - if (strcmp(name, ".") == 0 || - strcmp(name, "..") == 0 || - strstr(name, namePrefix) == NULL) { - uInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId); - continue; - } - - char filename[PATH_MAX] = {0}; - snprintf(filename, sizeof(filename), "%s%s", tmpPath, name); - TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ); - if (pFile == NULL) { - uError("failed to open file:%s since %s", filename, terrstr()); - continue; - } - if (taosLockFile(pFile) < 0) { - uError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr()); - taosCloseFile(&pFile); - continue; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep); - taosUnLockFile(pFile); - taosCloseFile(&pFile); - taosRemoveFile(filename); - uDebug("[monitor] send and delete slow log file when reveive connect rsp:%s", filename); - - } - - taosCloseDir(&pDir); - -END: taosRUnLockLatch(&monitorLock); } static void sendAllCounter(){ - MonitorClient** ppMonitor = (MonitorClient**)taosHashIterate(monitorCounterHash, NULL); - while (ppMonitor != NULL) { + MonitorClient** ppMonitor = NULL; + while ((ppMonitor = taosHashIterate(monitorSlowLogHash, ppMonitor))) { MonitorClient* pMonitor = *ppMonitor; - if (pMonitor != NULL){ - SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); - if(pInst == NULL){ - taosHashCancelIterate(monitorCounterHash, ppMonitor); - break; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); + if (pMonitor == NULL){ + continue; } - ppMonitor = taosHashIterate(monitorCounterHash, ppMonitor); + SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); + if(pInst == NULL){ + taosHashCancelIterate(monitorCounterHash, ppMonitor); + break; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); } } @@ -330,58 +222,58 @@ void monitorCreateClient(int64_t clusterId) { MonitorClient* pMonitor = NULL; taosWLockLatch(&monitorLock); if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) { - uInfo("[monitor] monitorCreateClient for %" PRIx64, clusterId); + tscInfo("[monitor] monitorCreateClient for %" PRIx64, clusterId); pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient)); if (pMonitor == NULL) { - uError("failed to create monitor client"); + tscError("failed to create monitor client"); goto fail; } pMonitor->clusterId = clusterId; char clusterKey[32] = {0}; if(snprintf(clusterKey, sizeof(clusterKey), "%"PRId64, clusterId) < 0){ - uError("failed to create cluster key"); + tscError("failed to create cluster key"); goto fail; } pMonitor->registry = taos_collector_registry_new(clusterKey); if(pMonitor->registry == NULL){ - uError("failed to create registry"); + tscError("failed to create registry"); goto fail; } pMonitor->colector = taos_collector_new(clusterKey); if(pMonitor->colector == NULL){ - uError("failed to create collector"); + tscError("failed to create collector"); goto fail; } taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector); pMonitor->counters = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (pMonitor->counters == NULL) { - uError("failed to create monitor counters"); + tscError("failed to create monitor counters"); goto fail; } // taosHashSetFreeFp(pMonitor->counters, destroyCounter); if(taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0){ - uError("failed to put monitor client to hash"); + tscError("failed to put monitor client to hash"); goto fail; } SAppInstInfo* pInst = getAppInstByClusterId(clusterId); if(pInst == NULL){ - uError("failed to get app instance by cluster id"); + tscError("failed to get app instance by cluster id"); pMonitor = NULL; goto fail; } pMonitor->timer = taosTmrStart(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer); if(pMonitor->timer == NULL){ - uError("failed to start timer"); + tscError("failed to start timer"); goto fail; } - uInfo("[monitor] monitorCreateClient for %"PRIx64 "finished %p.", clusterId, pMonitor); + tscInfo("[monitor] monitorCreateClient for %"PRIx64 "finished %p.", clusterId, pMonitor); } taosWUnLockLatch(&monitorLock); if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) { - uDebug("[monitor] monitorFlag already is 0"); + tscDebug("[monitor] monitorFlag already is 0"); } return; @@ -394,7 +286,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* taosWLockLatch(&monitorLock); MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES); if (ppMonitor == NULL || *ppMonitor == NULL) { - uError("failed to get monitor client"); + tscError("failed to get monitor client"); goto end; } taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); @@ -403,11 +295,11 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* MonitorClient* pMonitor = *ppMonitor; taos_collector_add_metric(pMonitor->colector, newCounter); if(taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0){ - uError("failed to put counter to monitor"); + tscError("failed to put counter to monitor"); taos_counter_destroy(newCounter); goto end; } - uInfo("[monitor] monitorCreateClientCounter %"PRIx64"(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter); + tscInfo("[monitor] monitorCreateClientCounter %"PRIx64"(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter); end: taosWUnLockLatch(&monitorLock); @@ -415,20 +307,25 @@ end: void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) { taosWLockLatch(&monitorLock); + if (atomic_load_32(&monitorFlag) == 1) { + taosRUnLockLatch(&monitorLock); + return; + } + MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES); if (ppMonitor == NULL || *ppMonitor == NULL) { - uError("monitorCounterInc not found pMonitor %"PRId64, clusterId); + tscError("monitorCounterInc not found pMonitor %"PRId64, clusterId); goto end; } MonitorClient* pMonitor = *ppMonitor; taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName)); - if (ppCounter == NULL || *ppCounter != NULL) { - uError("monitorCounterInc not found pCounter %"PRIx64":%s.", clusterId, counterName); + if (ppCounter == NULL || *ppCounter == NULL) { + tscError("monitorCounterInc not found pCounter %"PRIx64":%s.", clusterId, counterName); goto end; } taos_counter_inc(*ppCounter, label_values); - uInfo("[monitor] monitorCounterInc %"PRIx64"(%p):%s", pMonitor->clusterId, pMonitor, counterName); + tscDebug("[monitor] monitorCounterInc %"PRIx64"(%p):%s", pMonitor->clusterId, pMonitor, counterName); end: taosWUnLockLatch(&monitorLock); @@ -439,99 +336,349 @@ const char* monitorResultStr(SQL_RESULT_CODE code) { return result_state[code]; } -static void monitorFreeSlowLogData(MonitorSlowLogData* pData) { - if (pData == NULL) { - return; - } - taosMemoryFree(pData->value); -} - static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); } -static void reportSlowLog(void* param, void* tmrId) { - taosRLockLatch(&monitorLock); - if (atomic_load_32(&monitorFlag) == 1) { - taosRUnLockLatch(&monitorLock); - return; - } - SAppInstInfo* pInst = getAppInstByClusterId((int64_t)param); - if(pInst == NULL){ - uError("failed to get app inst, clusterId:%"PRIx64, (int64_t)param); - taosRUnLockLatch(&monitorLock); - return; - } - - void* tmp = taosHashGet(monitorSlowLogHash, ¶m, LONG_BYTES); - if(tmp == NULL){ - uError("failed to get file inst, clusterId:%"PRIx64, (int64_t)param); - taosRUnLockLatch(&monitorLock); - return; - } - - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - monitorReadSendSlowLog((*(SlowLogClient**)tmp)->pFile, pInst->pTransporter, &ep); - taosRUnLockLatch(&monitorLock); - - taosTmrReset(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); -} - static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){ - taosWLockLatch(&monitorLock); TdFilePtr pFile = NULL; void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); if (tmp == NULL){ char path[PATH_MAX] = {0}; char clusterId[32] = {0}; if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0){ - uError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId); - goto FAILED; + tscError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId); + return; } taosGetTmpfilePath(tmpPath, clusterId, path); - uInfo("[monitor] create slow log file:%s", path); + tscInfo("[monitor] create slow log file:%s", path); pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to open file:%s since %s", path, terrstr()); - goto FAILED; + tscError("failed to open file:%s since %s", path, terrstr()); + return; } SlowLogClient *pClient = taosMemoryCalloc(1, sizeof(SlowLogClient)); if (pClient == NULL){ - uError("failed to allocate memory for slow log client"); + tscError("failed to allocate memory for slow log client"); taosCloseFile(&pFile); - goto FAILED; + return; } + pClient->lastCheckTime = taosGetMonoTimestampMs(); + strcpy(pClient->path, path); + pClient->offset = 0; pClient->pFile = pFile; if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0){ - uError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); + tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); taosCloseFile(&pFile); taosMemoryFree(pClient); - goto FAILED; + return; } if(taosLockFile(pFile) < 0){ - uError("failed to lock file:%p since %s", pFile, terrstr()); - goto FAILED; + tscError("failed to lock file:%p since %s", pFile, terrstr()); + return; } - - SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); - if(pInst == NULL){ - uError("failed to get app instance by clusterId:%" PRId64, slowLogData->clusterId); - goto FAILED; - } - - pClient->timer = taosTmrStart(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, (void*)slowLogData->clusterId, monitorTimer); }else{ pFile = (*(SlowLogClient**)tmp)->pFile; } - if (taosWriteFile(pFile, slowLogData->value, strlen(slowLogData->value) + 1) < 0){ - uError("failed to write len to file:%p since %s", pFile, terrstr()); + if(taosLSeekFile(pFile, 0, SEEK_END) < 0){ + tscError("failed to seek file:%p code: %d", pFile, errno); + return; } - uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId); + if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0){ + tscError("failed to write len to file:%p since %s", pFile, terrstr()); + } + tscDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId); +} -FAILED: - taosWUnLockLatch(&monitorLock); +static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){ + tscDebug("[monitor] readFile slow begin pFile:%p, offset:%"PRId64 ", size:%"PRId64, pFile, *offset, size); + if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){ + tscError("failed to seek file:%p code: %d", pFile, errno); + return NULL; + } + + ASSERT(size > *offset); + char* pCont = NULL; + int64_t totalSize = 0; + if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) { + pCont = taosMemoryCalloc(1, 4 + SLOW_LOG_SEND_SIZE_MAX); //4 reserved for [] + totalSize = 4 + SLOW_LOG_SEND_SIZE_MAX; + }else{ + pCont = taosMemoryCalloc(1, 4 + (size - *offset)); + totalSize = 4 + (size - *offset); + } + + if(pCont == NULL){ + tscError("failed to allocate memory for slow log, size:%" PRId64, totalSize); + return NULL; + } + char* buf = pCont; + strcat(buf++, "["); + int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX); + if (readSize <= 0) { + if (readSize < 0){ + tscError("failed to read len from file:%p since %s", pFile, terrstr()); + } + taosMemoryFree(pCont); + return NULL; + } + + totalSize = 0; + while(1){ + size_t len = strlen(buf); + totalSize += (len+1); + if (totalSize > readSize || len == 0) { + *(buf-1) = ']'; + *buf = '\0'; + break; + } + buf[len] = ','; // replace '\0' with ',' + buf += (len + 1); + *offset += (len+1); + } + + tscDebug("[monitor] readFile slow log end, data:%s, offset:%"PRId64, pCont, *offset); + return pCont; +} + +static int64_t getFileSize(char* path){ + int64_t fileSize = 0; + if (taosStatFile(path, &fileSize, NULL, NULL) < 0) { + return -1; + } + + return fileSize; +} + +static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){ + MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); + if(pParam == NULL){ + taosMemoryFree(data); + taosMemoryFree(fileName); + return -1; + } + pParam->data = data; + pParam->offset = offset; + pParam->clusterId = clusterId; + pParam->type = type; + pParam->pFile = pFile; + pParam->fileName = fileName; + return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam); +} + +static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){ + int64_t size = getFileSize(*fileName); + if(size <= offset){ + processFileInTheEnd(pFile, *fileName); + tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); + }else{ + char* data = readFile(pFile, &offset, size); + if(data != NULL){ + sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, *fileName, pTransporter, epSet); + *fileName = NULL; + } + tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p, data:%s", pFile, data); + } +} + +static void monitorSendSlowLogAtRunning(int64_t clusterId){ + void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); + if (tmp == NULL){ + return; + } + SlowLogClient* pClient = (*(SlowLogClient**)tmp); + if (pClient == NULL){ + return; + } + int64_t size = getFileSize(pClient->path); + if(size <= pClient->offset){ + if(taosFtruncateFile(pClient->pFile, 0) < 0){ + tscError("failed to truncate file:%p code: %d", pClient->pFile, errno); + } + tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile); + pClient->offset = 0; + }else{ + SAppInstInfo* pInst = getAppInstByClusterId(clusterId); + if(pInst == NULL){ + tscError("failed to get app instance by clusterId:%" PRId64, clusterId); + return; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + char* data = readFile(pClient->pFile, &pClient->offset, size); + if(data != NULL){ + sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); + } + tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log:%s", data); + } +} + +static bool monitorSendSlowLogAtQuit(int64_t clusterId) { + void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); + if (tmp == NULL){ + return true; + } + SlowLogClient* pClient = (*(SlowLogClient**)tmp); + if (pClient == NULL){ + return true; + } + int64_t size = getFileSize(pClient->path); + if(size <= pClient->offset){ + processFileInTheEnd(pClient->pFile, pClient->path); + pClient->pFile = NULL; + tscInfo("[monitor] monitorSendSlowLogAtQuit remove file:%s", pClient->path); + if((--quitCnt) == 0){ + return true; + } + }else{ + SAppInstInfo* pInst = getAppInstByClusterId(clusterId); + if(pInst == NULL) { + return true; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + char* data = readFile(pClient->pFile, &pClient->offset, size); + if(data != NULL){ + sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep); + } + tscInfo("[monitor] monitorSendSlowLogAtQuit send slow log:%s", data); + } + return false; +} +static void monitorSendAllSlowLogAtQuit(){ + void* pIter = NULL; + while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) { + SlowLogClient* pClient = (*(SlowLogClient**)pIter); + if(pClient == NULL) { + continue; + } + int64_t size = getFileSize(pClient->path); + if(size <= pClient->offset){ + processFileInTheEnd(pClient->pFile, pClient->path); + pClient->pFile = NULL; + }else if(pClient->offset == 0){ + int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); + SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); + if(pInst == NULL) { + continue; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + char* data = readFile(pClient->pFile, &pClient->offset, size); + if(data != NULL && sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){ + quitCnt ++; + } + tscInfo("[monitor] monitorSendAllSlowLogAtQuit send slow log :%s", data); + } + } +} + +static void processFileRemoved(SlowLogClient* pClient){ + taosUnLockFile(pClient->pFile); + taosCloseFile(&(pClient->pFile)); + + TdFilePtr pFile = taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + tscError("failed to open file:%s since %s", pClient->path, terrstr()); + }else{ + pClient->pFile = pFile; + } +} + +static void monitorSendAllSlowLog(){ + int64_t t = taosGetMonoTimestampMs(); + void* pIter = NULL; + while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) { + int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); + SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); + SlowLogClient* pClient = (*(SlowLogClient**)pIter); + if (pClient == NULL){ + taosHashCancelIterate(monitorSlowLogHash, pIter); + return; + } + if (t - pClient->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000){ + pClient->lastCheckTime = t; + } else { + continue; + } + + if (pInst != NULL && pClient->offset == 0) { + int64_t size = getFileSize(pClient->path); + if(size <= 0){ + if(size < 0){ + tscError("[monitor] monitorSendAllSlowLog failed to get file size:%s, err:%d", pClient->path, errno); + if(errno == ENOENT){ + processFileRemoved(pClient); + } + } + continue; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + char* data = readFile(pClient->pFile, &pClient->offset, size); + if(data != NULL){ + sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); + } + tscDebug("[monitor] monitorSendAllSlowLog send slow log :%s", data); + } + } +} + +static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ + SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId); + + if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){ + tscInfo("[monitor] monitor is disabled, skip send slow log"); + return; + } + char namePrefix[PATH_MAX] = {0}; + if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) { + tscError("failed to generate slow log file name prefix"); + return; + } + + char tmpPath[PATH_MAX] = {0}; + if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) { + return; + } + + TdDirPtr pDir = taosOpenDir(tmpPath); + if (pDir == NULL) { + return; + } + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + if (taosDirEntryIsDir(de)) { + continue; + } + + char *name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || + strcmp(name, "..") == 0 || + strstr(name, namePrefix) == NULL) { + tscInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId); + continue; + } + + char filename[PATH_MAX] = {0}; + snprintf(filename, sizeof(filename), "%s%s", tmpPath, name); + TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE); + if (pFile == NULL) { + tscError("failed to open file:%s since %s", filename, terrstr()); + continue; + } + if (taosLockFile(pFile) < 0) { + tscError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr()); + taosCloseFile(&pFile); + continue; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + char *tmp = taosStrdup(filename); + monitorSendSlowLogAtBeginning(pInst->clusterId, &tmp, pFile, 0, pInst->pTransporter, &ep); + taosMemoryFree(tmp); + } + + taosCloseDir(&pDir); } static void* monitorThreadFunc(void *param){ @@ -543,10 +690,6 @@ static void* monitorThreadFunc(void *param){ } #endif - if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) { - return NULL; - } - char tmpPath[PATH_MAX] = {0}; if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0){ return NULL; @@ -559,32 +702,70 @@ static void* monitorThreadFunc(void *param){ } if (tsem2_init(&monitorSem, 0, 0) != 0) { - uError("sem init error since %s", terrstr()); + tscError("sem init error since %s", terrstr()); return NULL; } monitorQueue = taosOpenQueue(); if(monitorQueue == NULL){ - uError("open queue error since %s", terrstr()); + tscError("open queue error since %s", terrstr()); return NULL; } - uDebug("monitorThreadFunc start"); + + if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) { + return NULL; + } + tscDebug("monitorThreadFunc start"); + int64_t quitTime = 0; while (1) { - if (slowLogFlag > 0) break; + if (slowLogFlag > 0) { + if(quitCnt == 0){ + monitorSendAllSlowLogAtQuit(); + if(quitCnt == 0){ + tscInfo("monitorThreadFunc quit since no slow log to send"); + break; + } + quitTime = taosGetMonoTimestampMs(); + } + if(taosGetMonoTimestampMs() - quitTime > 500){ //quit at most 500ms + tscInfo("monitorThreadFunc quit since timeout"); + break; + } + } MonitorSlowLogData* slowLogData = NULL; taosReadQitem(monitorQueue, (void**)&slowLogData); if (slowLogData != NULL) { - uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); - if (slowLogData->value == NULL){ - monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); - }else{ + if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){ + if(slowLogData->pFile != NULL){ + SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); + if(pInst != NULL) { + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep); + } + }else{ + monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); + } + } else if(slowLogData->type == SLOW_LOG_WRITE){ monitorWriteSlowLog2File(slowLogData, tmpPath); + } else if(slowLogData->type == SLOW_LOG_READ_RUNNING){ + monitorSendSlowLogAtRunning(slowLogData->clusterId); + } else if(slowLogData->type == SLOW_LOG_READ_QUIT){ + if(monitorSendSlowLogAtQuit(slowLogData->clusterId)){ + tscInfo("monitorThreadFunc quit since all slow log sended"); + monitorFreeSlowLogData(slowLogData); + taosFreeQitem(slowLogData); + break; + } } } monitorFreeSlowLogData(slowLogData); taosFreeQitem(slowLogData); - tsem2_timewait(&monitorSem, 500); + + if (quitCnt == 0) { + monitorSendAllSlowLog(); + } + tsem2_timewait(&monitorSem, 100); } taosCloseQueue(monitorQueue); @@ -599,7 +780,7 @@ static int32_t tscMonitortInit() { taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); TdThread monitorThread; if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { - uError("failed to create monitor thread since %s", strerror(errno)); + tscError("failed to create monitor thread since %s", strerror(errno)); return -1; } @@ -609,7 +790,7 @@ static int32_t tscMonitortInit() { static void tscMonitorStop() { if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) { - uDebug("monitor thread already stopped"); + tscDebug("monitor thread already stopped"); return; } @@ -619,22 +800,22 @@ static void tscMonitorStop() { } void monitorInit() { - uInfo("[monitor] tscMonitor init"); + tscInfo("[monitor] tscMonitor init"); monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (monitorCounterHash == NULL) { - uError("failed to create monitorCounterHash"); + tscError("failed to create monitorCounterHash"); } taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (monitorSlowLogHash == NULL) { - uError("failed to create monitorSlowLogHash"); + tscError("failed to create monitorSlowLogHash"); } taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient); monitorTimer = taosTmrInit(0, 0, 0, "MONITOR"); if (monitorTimer == NULL) { - uError("failed to create monitor timer"); + tscError("failed to create monitor timer"); } taosInitRWLatch(&monitorLock); @@ -642,14 +823,13 @@ void monitorInit() { } void monitorClose() { - uInfo("[monitor] tscMonitor close"); + tscInfo("[monitor] tscMonitor close"); taosWLockLatch(&monitorLock); if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) { - uDebug("[monitor] monitorFlag is not 0"); + tscDebug("[monitor] monitorFlag is not 0"); } tscMonitorStop(); - sendAllSlowLog(); sendAllCounter(); taosHashCleanup(monitorCounterHash); taosHashCleanup(monitorSlowLogHash); @@ -657,17 +837,16 @@ void monitorClose() { taosWUnLockLatch(&monitorLock); } -int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value){ +int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){ MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0); if (slowLogData == NULL) { - uError("[monitor] failed to allocate slow log data"); + tscError("[monitor] failed to allocate slow log data"); return -1; } - slowLogData->clusterId = clusterId; - slowLogData->value = value; - uDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); - while (monitorQueue == NULL) { - taosMsleep(100); + *slowLogData = data; + tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type); + while (atomic_load_32(&slowLogFlag) == -1) { + taosMsleep(5); } if (taosWriteQitem(monitorQueue, slowLogData) == 0){ tsem2_post(&monitorSem); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 417cb8b562..e5baa7137e 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -155,7 +155,10 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){ tscError("failed to put appInfo into appInfo.pInstMapByClusterId"); } - monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, NULL); + MonitorSlowLogData data = {0}; + data.clusterId = pTscObj->pAppInfo->clusterId; + data.type = SLOW_LOG_READ_BEGINNIG; + monitorPutData2MonitorQueue(data); monitorClientSlowQueryInit(connectRsp.clusterId); monitorClientSQLReqInit(connectRsp.clusterId); } diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index a771fc1635..7e63dcd36a 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1380,7 +1380,7 @@ void freeSSmlKv(void *data) { void smlDestroyInfo(SSmlHandle *info) { if (!info) return; - qDestroyQuery(info->pQuery); +// qDestroyQuery(info->pQuery); taosHashCleanup(info->pVgHash); taosHashCleanup(info->childTables); @@ -1912,6 +1912,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, return (TAOS_RES *)request; } info->pRequest = request; + info->pRequest->pQuery = info->pQuery; info->ttl = ttl; info->precision = precision; info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol; diff --git a/source/client/test/clientMonitorTests.cpp b/source/client/test/clientMonitorTests.cpp index 2d3ce87f38..e0518a2ce2 100644 --- a/source/client/test/clientMonitorTests.cpp +++ b/source/client/test/clientMonitorTests.cpp @@ -64,6 +64,65 @@ int main(int argc, char** argv) { // clusterMonitorClose(cluster2); //} +static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){ + if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){ + uError("failed to seek file:%p code: %d", pFile, errno); + return NULL; + } + + ASSERT(size > *offset); + char* pCont = NULL; + int64_t totalSize = 0; + if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) { + pCont = (char*)taosMemoryCalloc(1, 2 * SLOW_LOG_SEND_SIZE_MAX); + totalSize = 2 * SLOW_LOG_SEND_SIZE_MAX; + }else{ + pCont = (char*)taosMemoryCalloc(1, 2 * (size - *offset)); + totalSize = 2 * (size - *offset); + } + + if(pCont == NULL){ + uError("failed to allocate memory for slow log, size:%" PRId64, totalSize); + return NULL; + } + char* buf = pCont; + strcat(buf++, "["); + int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX); + if (readSize <= 0) { + if (readSize < 0){ + uError("failed to read len from file:%p since %s", pFile, terrstr()); + } + taosMemoryFree(pCont); + return NULL; + } + + totalSize = 0; + while(1){ + size_t len = strlen(buf); + totalSize += (len+1); + if (totalSize > readSize || len == 0) { + *(buf-1) = ']'; + *buf = '\0'; + break; + } + buf[len] = ','; // replace '\0' with ',' + buf += (len + 1); + *offset += (len+1); + } + + uDebug("[monitor] monitorReadSendSlowLog slow log:%s", pCont); + return pCont; +} + +static int64_t getFileSize(char* path){ + int64_t fileSize = 0; + if (taosStatFile(path, &fileSize, NULL, NULL) < 0) { + return -1; + } + + return fileSize; +} + TEST(clientMonitorTest, sendTest) { TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); ASSERT_TRUE(taos != NULL); @@ -91,8 +150,8 @@ TEST(clientMonitorTest, ReadOneFile) { } const int batch = 10; - const int size = SLOW_LOG_SEND_SIZE/batch; - for(int i = 0; i < batch + 1; i++){ + const int size = 10; + for(int i = 0; i < batch; i++){ char value[size] = {0}; memset(value, '0' + i, size - 1); if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ @@ -106,64 +165,72 @@ TEST(clientMonitorTest, ReadOneFile) { // Create an SEpSet object and set it up for testing SEpSet* epSet = NULL; + int64_t fileSize = getFileSize("./tdengine-1-wewe"); // Call the function to be tested -// monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet); - - char value[size] = {0}; - memset(value, '0', size - 1); - if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ - uError("failed to write len to file:%p since %s", pFile, terrstr()); + int64_t offset = 0; + while(1){ + if (offset >= fileSize) { + break; + } + char* val = readFile(pFile, &offset, fileSize); + printf("offset:%lld,fileSize:%lld,val:%s\n", offset, fileSize, val); } +// char value[size] = {0}; +// memset(value, '0', size - 1); +// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ +// uError("failed to write len to file:%p since %s", pFile, terrstr()); +// } + // monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet); // Clean up any resources created for testing taosCloseFile(&pFile); } -TEST(clientMonitorTest, ReadTwoFile) { - // Create a TdFilePtr object and set it up for testing - - TdFilePtr pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); - if (pFile == NULL) { - uError("failed to open file:./test.txt since %s", terrstr()); - return; - } - - const int batch = 10; - const int size = SLOW_LOG_SEND_SIZE/batch; - for(int i = 0; i < batch + 1; i++){ - char value[size] = {0}; - memset(value, '0' + i, size - 1); - if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ - uError("failed to write len to file:%p since %s", pFile, terrstr()); - } - } - - taosFsyncFile(pFile); - taosCloseFile(&pFile); - - pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-2-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); - if (pFile == NULL) { - uError("failed to open file:./test.txt since %s", terrstr()); - return; - } - - for(int i = 0; i < batch + 1; i++){ - char value[size] = {0}; - memset(value, '0' + i, size - 1); - if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ - uError("failed to write len to file:%p since %s", pFile, terrstr()); - } - } - - taosFsyncFile(pFile); - taosCloseFile(&pFile); - - SAppInstInfo pAppInfo = {0}; - pAppInfo.clusterId = 2; - pAppInfo.monitorParas.tsEnableMonitor = 1; - strcpy(tsTempDir,"/tmp"); -// monitorSendAllSlowLogFromTempDir(&pAppInfo); - -} \ No newline at end of file +//TEST(clientMonitorTest, ReadTwoFile) { +// // Create a TdFilePtr object and set it up for testing +// +// TdFilePtr pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); +// if (pFile == NULL) { +// uError("failed to open file:./test.txt since %s", terrstr()); +// return; +// } +// +// const int batch = 10; +// const int size = SLOW_LOG_SEND_SIZE/batch; +// for(int i = 0; i < batch + 1; i++){ +// char value[size] = {0}; +// memset(value, '0' + i, size - 1); +// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ +// uError("failed to write len to file:%p since %s", pFile, terrstr()); +// } +// } +// +// taosFsyncFile(pFile); +// taosCloseFile(&pFile); +// +// pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-2-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); +// if (pFile == NULL) { +// uError("failed to open file:./test.txt since %s", terrstr()); +// return; +// } +// +// for(int i = 0; i < batch + 1; i++){ +// char value[size] = {0}; +// memset(value, '0' + i, size - 1); +// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ +// uError("failed to write len to file:%p since %s", pFile, terrstr()); +// } +// } +// +// taosFsyncFile(pFile); +// taosCloseFile(&pFile); +// +// SAppInstInfo pAppInfo = {0}; +// pAppInfo.clusterId = 2; +// pAppInfo.monitorParas.tsEnableMonitor = 1; +// strcpy(tsTempDir,"/tmp"); +//// monitorSendAllSlowLogFromTempDir(&pAppInfo); +// +//} \ No newline at end of file diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index f91b14b6d8..2338674ae3 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1065,6 +1065,7 @@ int sml_escape1_Test() { for(int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++){ pRes = taos_schemaless_insert(taos, (char**)&sql[i], 1, TSDB_SML_LINE_PROTOCOL, 0); int code = taos_errno(pRes); + taos_free_result(pRes); ASSERT(code); }