From a17ff3dbf5a3f4cdb642d865c05720897b061b87 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 1 Jul 2024 19:08:16 +0800 Subject: [PATCH] fix:[TS-4921]refactor reporting logic for slow log --- include/libs/monitor/clientMonitor.h | 22 +- source/client/src/clientEnv.c | 3 +- source/client/src/clientMonitor.c | 462 ++++++++++++++++----------- source/client/src/clientMsgHandler.c | 3 +- 4 files changed, 295 insertions(+), 195 deletions(-) diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index a8cdbe4ad1..5e89c07b2b 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -25,12 +25,19 @@ extern "C" { #include "query.h" #include "tqueue.h" -typedef enum SQL_RESULT_CODE { +typedef enum { SQL_RESULT_SUCCESS = 0, SQL_RESULT_FAILED = 1, SQL_RESULT_CANCEL = 2, } SQL_RESULT_CODE; +typedef enum { + SLOW_LOG_WRITE = 0, + SLOW_LOG_READ_RUNNING = 1, + SLOW_LOG_READ_BEGINNIG = 2, + SLOW_LOG_READ_QUIT = 2, +} SLOW_LOG_QUEUE_TYPE; + #define SLOW_LOG_SEND_SIZE 32*1024 typedef struct { @@ -45,11 +52,18 @@ typedef struct { TdFilePtr pFile; int64_t lastCheckTime; char path[PATH_MAX]; + int64_t offset; } SlowLogClient; typedef struct { - int64_t clusterId; - char *value; + int64_t clusterId; + SLOW_LOG_QUEUE_TYPE type; + union{ + char* data; + int64_t offset; + }; + TdFilePtr pFile; + char* fileName; } MonitorSlowLogData; void monitorClose(); @@ -61,7 +75,7 @@ void monitorCreateClient(int64_t clusterId); void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys); void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values); const char* monitorResultStr(SQL_RESULT_CODE code); -int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value); +int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data); #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 1248f87f19..6689334c1e 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -157,7 +157,8 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_ } char* value = cJSON_PrintUnformatted(json); - if(monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, value) < 0){ + MonitorSlowLogData data = {.clusterId = pTscObj->pAppInfo->clusterId, .type = SLOW_LOG_WRITE, .data = value}; + if(monitorPutData2MonitorQueue(data) < 0){ taosMemoryFree(value); } diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index fe5848f24c..a4be6210bb 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -13,6 +13,7 @@ void* monitorTimer; SHashObj* monitorCounterHash; int32_t slowLogFlag = -1; int32_t monitorFlag = -1; +int32_t quitCnt = 0; tsem2_t monitorSem; STaosQueue* monitorQueue; SHashObj* monitorSlowLogHash; @@ -63,6 +64,19 @@ static void destroyMonitorClient(void* data){ taosMemoryFree(pMonitor); } +static void monitorFreeSlowLogData(void *paras) { + MonitorSlowLogData* pData = (MonitorSlowLogData*)paras; + if (pData == NULL) { + return; + } + if (pData->type == SLOW_LOG_WRITE){ + taosMemoryFree(pData->data); + } + if (pData->type == SLOW_LOG_READ_BEGINNIG){ + taosMemoryFree(pData->fileName); + } +} + static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) { void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES); if(p == NULL){ @@ -72,11 +86,6 @@ static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) { return *(SAppInstInfo**)p; } -typedef struct { - tsem_t sem; - int32_t code; -} SlowLogParam; - static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { if (TSDB_CODE_SUCCESS != code) { uError("found error in monitorReport send callback, code:%d, please check the network.", code); @@ -86,9 +95,13 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pEpSet); } if(param != NULL){ - SlowLogParam* p = (SlowLogParam*)param; - p->code = code; - tsem_post(&p->sem); + MonitorSlowLogData* p = (MonitorSlowLogData*)param; + if(code != 0){ + uError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); + } + if(monitorPutData2MonitorQueue(*p) == 0){ + p->fileName = NULL; + } } return code; } @@ -100,11 +113,15 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO sStatisReq.type = type; int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq); - if (tlen < 0) return 0; + if (tlen < 0) { + monitorFreeSlowLogData(param); + return -1; + } void* buf = taosMemoryMalloc(tlen); if (buf == NULL) { uError("sendReport failed, out of memory, len:%d", tlen); terrno = TSDB_CODE_OUT_OF_MEMORY; + monitorFreeSlowLogData(param); return -1; } tSerializeSStatisReq(buf, tlen, &sStatisReq); @@ -113,6 +130,8 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO if (pInfo == NULL) { uError("sendReport failed, out of memory send info"); terrno = TSDB_CODE_OUT_OF_MEMORY; + monitorFreeSlowLogData(param); + taosMemoryFree(buf); return -1; } pInfo->fp = monitorReportAsyncCB; @@ -120,7 +139,7 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO pInfo->msgInfo.len = tlen; pInfo->msgType = TDMT_MND_STATIS; pInfo->param = param; -// pInfo->paramFreeFp = taosMemoryFree; + pInfo->paramFreeFp = monitorFreeSlowLogData; pInfo->requestId = tGenIdPI64(); pInfo->requestObjRefId = 0; @@ -132,89 +151,6 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO return code; } -static bool monitorReadSendSlowLog(TdFilePtr pFile, char* path, void* pTransporter, SEpSet *epSet){ - int64_t filesize = 0; - if (taosStatFile(path, &filesize, NULL, NULL) < 0) { - return false; - } - - if (filesize == 0) { - return true; - } - - char buf[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log - char pCont[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log - int32_t offset = 0; - if(taosLSeekFile(pFile, 0, SEEK_SET) < 0){ - uError("failed to seek file:%p code: %d", pFile, errno); - return false; - } - while(1){ - int64_t readSize = taosReadFile(pFile, buf + offset, SLOW_LOG_SEND_SIZE - offset); - if (readSize <= 0) { - if (readSize < 0){ - uError("failed to read len from file:%p since %s", pFile, terrstr()); - return false; - } - break; - } - - memset(pCont, 0, sizeof(pCont)); - strcat(pCont, "["); - char* string = buf; - for(int i = 0; i < readSize + offset; i++){ - if (buf[i] == '\0') { - if (string != buf) strcat(pCont, ","); - strcat(pCont, string); - uDebug("[monitor] monitorReadSendSlowLog slow log:%s", string); - string = buf + i + 1; - } - } - strcat(pCont, "]"); - if (pTransporter && pCont != NULL) { - SlowLogParam* pParam = taosMemoryMalloc(sizeof(SlowLogParam)); - if (pParam == NULL) { - return false; - } - if (tsem_init(&pParam->sem, 0, 0) != 0){ - taosMemoryFree(pParam); - return false; - } - pParam->code = sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_SLOW_LOG, pParam); - if(pParam->code == TSDB_CODE_SUCCESS){ - tsem_wait(&pParam->sem); - } - tsem_destroy(&pParam->sem); - - if(pParam->code != TSDB_CODE_SUCCESS){ - if(taosLSeekFile(pFile, -readSize, SEEK_CUR) < 0){ - uError("failed to seek file:%p code: %d", pFile, errno); - } - uError("failed to send report:%s", pCont); - taosMemoryFree(pParam); - return false; - } - taosMemoryFree(pParam); - - uDebug("[monitor] monitorReadSendSlowLog send slow log to mnode:%s", pCont) - } - - if (readSize + offset < SLOW_LOG_SEND_SIZE) { - break; - } - offset = SLOW_LOG_SEND_SIZE - (string - buf); - if(buf != string && offset != 0){ - memmove(buf, string, offset); - uDebug("[monitor] monitorReadSendSlowLog left slow log:%s", buf) - } - } - if(taosFtruncateFile(pFile, 0) < 0){ - uError("failed to truncate file:%p code: %d", pFile, errno); - } - uDebug("[monitor] monitorReadSendSlowLog send slow log file:%p", pFile); - return true; -} - static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet *epSet) { char ts[50] = {0}; sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); @@ -250,70 +186,6 @@ static void reportSendProcess(void* param, void* tmrId) { taosRUnLockLatch(&monitorLock); } -static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ - SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId); - - if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){ - uInfo("[monitor] monitor is disabled, skip send slow log"); - return; - } - char namePrefix[PATH_MAX] = {0}; - if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) { - uError("failed to generate slow log file name prefix"); - return; - } - - char tmpPath[PATH_MAX] = {0}; - if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) { - return; - } - - TdDirPtr pDir = taosOpenDir(tmpPath); - if (pDir == NULL) { - return; - } - - TdDirEntryPtr de = NULL; - while ((de = taosReadDir(pDir)) != NULL) { - if (taosDirEntryIsDir(de)) { - continue; - } - - char *name = taosGetDirEntryName(de); - if (strcmp(name, ".") == 0 || - strcmp(name, "..") == 0 || - strstr(name, namePrefix) == NULL) { - uInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId); - continue; - } - - char filename[PATH_MAX] = {0}; - snprintf(filename, sizeof(filename), "%s%s", tmpPath, name); - TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ); - if (pFile == NULL) { - uError("failed to open file:%s since %s", filename, terrstr()); - continue; - } - if (taosLockFile(pFile) < 0) { - uError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr()); - taosCloseFile(&pFile); - continue; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - bool truncated = monitorReadSendSlowLog(pFile, filename, pInst->pTransporter, &ep); - taosUnLockFile(pFile); - taosCloseFile(&pFile); - - if(truncated){ - taosRemoveFile(filename); - } - uDebug("[monitor] send and delete slow log file when reveive connect rsp:%s", filename); - - } - - taosCloseDir(&pDir); -} - static void sendAllCounter(){ MonitorClient** ppMonitor = (MonitorClient**)taosHashIterate(monitorCounterHash, NULL); while (ppMonitor != NULL) { @@ -444,13 +316,6 @@ const char* monitorResultStr(SQL_RESULT_CODE code) { return result_state[code]; } -static void monitorFreeSlowLogData(MonitorSlowLogData* pData) { - if (pData == NULL) { - return; - } - taosMemoryFree(pData->value); -} - static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); } static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){ @@ -480,6 +345,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP } pClient->lastCheckTime = taosGetMonoTimestampMs(); strcpy(pClient->path, path); + pClient->offset = 0; pClient->pFile = pFile; if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0){ uError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); @@ -502,36 +368,236 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP pFile = (*(SlowLogClient**)tmp)->pFile; } - if (taosWriteFile(pFile, slowLogData->value, strlen(slowLogData->value) + 1) < 0){ + if(taosLSeekFile(pFile, 0, SEEK_END) < 0){ + uError("failed to seek file:%p code: %d", pFile, errno); + return; + } + if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0){ uError("failed to write len to file:%p since %s", pFile, terrstr()); } uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId); } -static void monitorSendAllSlowLog(bool quit){ - int64_t t = taosGetMonoTimestampMs(); +static char* readFile(TdFilePtr pFile, int64_t *offset, bool* isEnd){ + if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){ + uError("failed to seek file:%p code: %d", pFile, errno); + return NULL; + } + int64_t totalSize = 0; + char* pCont = taosMemoryCalloc(1, SLOW_LOG_SEND_SIZE); + if(pCont == NULL){ + return NULL; + } + strcat(pCont, "["); + + while(1) { + char* pLine = NULL; + int64_t readLen = taosGetLineFile(pFile, &pLine); + + if(totalSize + readLen >= SLOW_LOG_SEND_SIZE){ + break; + } + if (readLen <= 0) { + if (readLen < 0) { + uError("failed to read len from file:%p since %s", pFile, terrstr()); + }else{ + *isEnd = true; + } + break; + } + + if (totalSize != 0) strcat(pCont, ","); + strcat(pCont, pLine); + totalSize += readLen; + } + strcat(pCont, "]"); + uDebug("[monitor] monitorReadSendSlowLog slow log:%s", pCont); + *offset += totalSize; + return pCont; +} + +static bool isFileEmpty(char* path){ + int64_t filesize = 0; + if (taosStatFile(path, &filesize, NULL, NULL) < 0) { + return false; + } + + if (filesize == 0) { + return true; + } + return false; +} + +static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){ + MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); + pParam->data = data; + pParam->offset = offset; + pParam->clusterId = clusterId; + pParam->type = type; + pParam->pFile = pFile; + pParam->fileName = fileName; + return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam); +} + +static void monitorSendSlowLogAtBeginning(int64_t clusterId, char* fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){ + bool isEnd = false; + char* data = readFile(pFile, &offset, &isEnd); + if(isEnd){ + taosFtruncateFile(pFile, 0); + taosUnLockFile(pFile); + taosCloseFile(&pFile); + taosRemoveFile(fileName); + uDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", fileName); + }else{ + sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, taosStrdup(fileName), pTransporter, epSet); + uDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p", pFile); + } +} + +static void monitorSendSlowLogAtRunning(int64_t clusterId){ + void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); + SlowLogClient* pClient = (*(SlowLogClient**)tmp); + bool isEnd = false; + char* data = readFile(pClient->pFile, &pClient->offset, &isEnd); + if(isEnd){ + if(taosFtruncateFile(pClient->pFile, 0) < 0){ + uError("failed to truncate file:%p code: %d", pClient->pFile, errno); + } + pClient->offset = 0; + }else if(data != NULL){ + SAppInstInfo* pInst = getAppInstByClusterId(clusterId); + if(pInst == NULL){ + uError("failed to get app instance by clusterId:%" PRId64, clusterId); + return; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); + uDebug("[monitor] monitorReadSendSlowLog send slow log:%s", data); + } +} + +static bool monitorSendSlowLogAtQuit(int64_t clusterId) { + void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); + SlowLogClient* pClient = (*(SlowLogClient**)tmp); + + bool isEnd = false; + char* data = readFile(pClient->pFile, &pClient->offset, &isEnd); + if(isEnd){ + taosUnLockFile(pClient->pFile); + taosCloseFile(&(pClient->pFile)); + taosRemoveFile(pClient->path); + if((--quitCnt) == 0){ + return true; + } + }else if(data != NULL){ + SAppInstInfo* pInst = getAppInstByClusterId(clusterId); + if(pInst == NULL) { + return true; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep); + uDebug("[monitor] monitorReadSendSlowLog send slow log:%s", data); + } + return false; +} +static void monitorSendAllSlowLogAtQuit(){ void* pIter = NULL; while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) { int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); - bool truncated = false; - if(quit || (pInst != NULL && t - (*(SlowLogClient**)pIter)->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000)) { - (*(SlowLogClient**)pIter)->lastCheckTime = t; - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - truncated = monitorReadSendSlowLog((*(SlowLogClient**)pIter)->pFile, (*(SlowLogClient**)pIter)->path, pInst->pTransporter, &ep); - } + if(pInst == NULL) return; + SlowLogClient* pClient = (*(SlowLogClient**)pIter); + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + bool isEnd = false; + int64_t offset = 0; + char* data = readFile(pClient->pFile, &offset, &isEnd); - if(quit){ - taosUnLockFile((*(SlowLogClient**)pIter)->pFile); - taosCloseFile(&((*(SlowLogClient**)pIter)->pFile)); - if(truncated){ - taosRemoveFile((*(SlowLogClient**)pIter)->path); + if(data != NULL && sendSlowLog(*clusterId, data, NULL, offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){ + quitCnt ++; + } + uDebug("[monitor] monitorSendAllSlowLogAtQuit send slow log :%s", data); + } +} + +static void monitorSendAllSlowLog(){ + int64_t t = taosGetMonoTimestampMs(); + void* pIter = NULL; + while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) { + int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); + SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); + SlowLogClient* pClient = (*(SlowLogClient**)pIter); + if (pInst != NULL && t - pClient->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000 && + pClient->offset == 0 && !isFileEmpty(pClient->path)) { + pClient->lastCheckTime = t; + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + bool isEnd = false; + int64_t offset = 0; + char* data = readFile(pClient->pFile, &offset, &isEnd); + if(data){ + sendSlowLog(*clusterId, data, NULL, offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); } + uDebug("[monitor] monitorSendAllSlowLog send slow log :%s", data); } } } +static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ + SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId); + + if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){ + uInfo("[monitor] monitor is disabled, skip send slow log"); + return; + } + char namePrefix[PATH_MAX] = {0}; + if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) { + uError("failed to generate slow log file name prefix"); + return; + } + + char tmpPath[PATH_MAX] = {0}; + if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) { + return; + } + + TdDirPtr pDir = taosOpenDir(tmpPath); + if (pDir == NULL) { + return; + } + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + if (taosDirEntryIsDir(de)) { + continue; + } + + char *name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || + strcmp(name, "..") == 0 || + strstr(name, namePrefix) == NULL) { + uInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId); + continue; + } + + char filename[PATH_MAX] = {0}; + snprintf(filename, sizeof(filename), "%s%s", tmpPath, name); + TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ); + if (pFile == NULL) { + uError("failed to open file:%s since %s", filename, terrstr()); + continue; + } + if (taosLockFile(pFile) < 0) { + uError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr()); + taosCloseFile(&pFile); + continue; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + monitorSendSlowLogAtBeginning(pInst->clusterId, filename, pFile, 0, pInst->pTransporter, &ep); + } + + taosCloseDir(&pDir); +} + static void* monitorThreadFunc(void *param){ setThreadName("client-monitor-slowlog"); @@ -567,26 +633,45 @@ static void* monitorThreadFunc(void *param){ return NULL; } uDebug("monitorThreadFunc start"); + int64_t quitTime = 0; while (1) { if (slowLogFlag > 0) { - monitorSendAllSlowLog(true); - break; + if(quitCnt == 0){ + monitorSendAllSlowLogAtQuit(); + quitTime = taosGetMonoTimestampMs(); + } + if(taosGetMonoTimestampMs() - quitTime > 500){ //quit at most 500ms + break; + } } MonitorSlowLogData* slowLogData = NULL; taosReadQitem(monitorQueue, (void**)&slowLogData); if (slowLogData != NULL) { - uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); - if (slowLogData->value == NULL){ - monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); - } else{ + if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){ + if(slowLogData->pFile != NULL){ + SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); + if(pInst != NULL) { + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + monitorSendSlowLogAtBeginning(slowLogData->clusterId, slowLogData->fileName, slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep); + } + }else{ + monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); + } + } else if(slowLogData->type == SLOW_LOG_WRITE){ monitorWriteSlowLog2File(slowLogData, tmpPath); + } else if(slowLogData->type == SLOW_LOG_READ_RUNNING){ + monitorSendSlowLogAtRunning(slowLogData->clusterId); + }else if(slowLogData->type == SLOW_LOG_READ_QUIT){ + if(monitorSendSlowLogAtQuit(slowLogData->clusterId)){ + break; + } } } monitorFreeSlowLogData(slowLogData); taosFreeQitem(slowLogData); - monitorSendAllSlowLog(false); + monitorSendAllSlowLog(); tsem2_timewait(&monitorSem, 100); } @@ -659,15 +744,14 @@ void monitorClose() { taosWUnLockLatch(&monitorLock); } -int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value){ +int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){ MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0); if (slowLogData == NULL) { uError("[monitor] failed to allocate slow log data"); return -1; } - slowLogData->clusterId = clusterId; - slowLogData->value = value; - uDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); + *slowLogData = data; + uDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type); while (atomic_load_32(&slowLogFlag) == -1) { taosMsleep(5); } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 417cb8b562..87c334218f 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -155,7 +155,8 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){ tscError("failed to put appInfo into appInfo.pInstMapByClusterId"); } - monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, NULL); + MonitorSlowLogData data ={.clusterId = pTscObj->pAppInfo->clusterId, .type = SLOW_LOG_READ_BEGINNIG, .pFile = NULL}; + monitorPutData2MonitorQueue(data); monitorClientSlowQueryInit(connectRsp.clusterId); monitorClientSQLReqInit(connectRsp.clusterId); }