From 6c5acdfc4bb5c247fb5c85347e356b3cd9b43387 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 5 Jul 2024 14:03:12 +0800 Subject: [PATCH] fix:[TS-4921]refactor code --- include/libs/monitor/clientMonitor.h | 7 +++ source/client/src/clientMonitor.c | 80 ++++++++++++---------------- 2 files changed, 40 insertions(+), 47 deletions(-) diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 1e6db8c00a..3bb325921e 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -38,6 +38,13 @@ typedef enum { SLOW_LOG_READ_QUIT = 3, } SLOW_LOG_QUEUE_TYPE; +char* queueTypeStr[] = { + "SLOW_LOG_WRITE", + "SLOW_LOG_READ_RUNNING", + "SLOW_LOG_READ_BEGINNIG", + "SLOW_LOG_READ_QUIT" +}; + #define SLOW_LOG_SEND_SIZE_MAX 1024*1024 typedef struct { diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 032b7cdeea..d1a9897caa 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -454,6 +454,10 @@ static int64_t getFileSize(char* path){ } static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){ + if (data == NULL){ + taosMemoryFree(fileName); + return -1; + } MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); if(pParam == NULL){ taosMemoryFree(data); @@ -469,17 +473,26 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64 return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam); } -static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){ +static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size, SLOW_LOG_QUEUE_TYPE type, char* fileName){ + SAppInstInfo* pInst = getAppInstByClusterId(clusterId); + if(pInst == NULL){ + tscError("failed to get app instance by clusterId:%" PRId64, clusterId); + return -1; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + char* data = readFile(pFile, offset, size); + return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName, pInst->pTransporter, &ep); +} + +static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset){ int64_t size = getFileSize(*fileName); if(size <= offset){ processFileInTheEnd(pFile, *fileName); tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); }else{ - char* data = readFile(pFile, &offset, size); - if(data != NULL){ - sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, *fileName, pTransporter, epSet); - *fileName = NULL; - } + int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName); + tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log clusterId:%"PRId64",ret:%d", clusterId, code); + *fileName = NULL; } } @@ -500,16 +513,8 @@ static void monitorSendSlowLogAtRunning(int64_t clusterId){ tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile); pClient->offset = 0; }else{ - SAppInstInfo* pInst = getAppInstByClusterId(clusterId); - if(pInst == NULL){ - tscError("failed to get app instance by clusterId:%" PRId64, clusterId); - return; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL){ - sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); - } + int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL); + tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log clusterId:%"PRId64",ret:%d", clusterId, code); } } @@ -531,15 +536,8 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) { return true; } }else{ - SAppInstInfo* pInst = getAppInstByClusterId(clusterId); - if(pInst == NULL) { - return true; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL){ - sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep); - } + int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL); + tscDebug("[monitor] monitorSendSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", clusterId, code); } return false; } @@ -556,13 +554,9 @@ static void monitorSendAllSlowLogAtQuit(){ pClient->pFile = NULL; }else if(pClient->offset == 0){ int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); - SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); - if(pInst == NULL) { - continue; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL && sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){ + int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL); + tscDebug("[monitor] monitorSendAllSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", *clusterId, code); + if (code == 0){ quitCnt ++; } } @@ -610,11 +604,8 @@ static void monitorSendAllSlowLog(){ } continue; } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL){ - sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); - } + int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL); + tscDebug("[monitor] monitorSendAllSlowLog send slow log clusterId:%"PRId64",ret:%d", *clusterId, code); } } } @@ -627,7 +618,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ return; } char namePrefix[PATH_MAX] = {0}; - if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) { + if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) { tscError("failed to generate slow log file name prefix"); return; } @@ -652,7 +643,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || strstr(name, namePrefix) == NULL) { - tscInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId); + tscInfo("skip file:%s, for cluster id:%"PRIx64, name, clusterId); continue; } @@ -668,9 +659,8 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ taosCloseFile(&pFile); continue; } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); char *tmp = taosStrdup(filename); - monitorSendSlowLogAtBeginning(pInst->clusterId, &tmp, pFile, 0, pInst->pTransporter, &ep); + monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0); taosMemoryFree(tmp); } @@ -712,11 +702,7 @@ static void* monitorThreadFunc(void *param){ if (slowLogData != NULL) { if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){ if(slowLogData->pFile != NULL){ - SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); - if(pInst != NULL) { - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep); - } + monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset); }else{ monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); } @@ -850,7 +836,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){ return -1; } *slowLogData = data; - tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type); + tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%s, data:%s", slowLogData->clusterId, queueTypeStr[slowLogData->type], slowLogData->data); if (taosWriteQitem(monitorQueue, slowLogData) == 0){ tsem2_post(&monitorSem); }else{