From cfbd475fc28e7a08fde957ac4b9a746f70dae702 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 3 Jul 2024 15:21:56 +0800 Subject: [PATCH] fix:[TS-4921]refactor reporting logic for slow log --- include/libs/monitor/clientMonitor.h | 6 +- source/client/src/clientMonitor.c | 89 +++++++++++++++------------- 2 files changed, 51 insertions(+), 44 deletions(-) diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 69926c62e5..4c7ab6f65a 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -58,10 +58,8 @@ typedef struct { typedef struct { int64_t clusterId; SLOW_LOG_QUEUE_TYPE type; - union{ - char* data; - int64_t offset; - }; + char* data; + int64_t offset; TdFilePtr pFile; char* fileName; } MonitorSlowLogData; diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 79b91c994a..a102995c0d 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -31,22 +31,34 @@ static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){ return 0; } -//static void destroyCounter(void* data){ -// if (data == NULL) { -// return; -// } -// taos_counter_t* conuter = *(taos_counter_t**)data; -// if(conuter == NULL){ -// return; -// } -// taos_counter_destroy(conuter); -//} +static void processFileInTheEnd(TdFilePtr pFile, char* path){ + if(pFile == NULL){ + return; + } + if(taosFtruncateFile(pFile, 0) != 0){ + tscError("failed to truncate file:%s, errno:%d", path, errno); + return; + } + if(taosUnLockFile(pFile) != 0){ + tscError("failed to unlock file:%s, errno:%d", path, errno); + return; + } + if(taosCloseFile(&(pFile)) != 0){ + tscError("failed to close file:%s, errno:%d", path, errno); + return; + } + if(taosRemoveFile(path) != 0){ + tscError("failed to remove file:%s, errno:%d", path, errno); + return; + } +} static void destroySlowLogClient(void* data){ if (data == NULL) { return; } SlowLogClient* slowLogClient = *(SlowLogClient**)data; + processFileInTheEnd(slowLogClient->pFile, slowLogClient->path); taosMemoryFree(slowLogClient); } @@ -70,12 +82,17 @@ static void monitorFreeSlowLogData(void *paras) { if (pData == NULL) { return; } - taosMemoryFree(pData->data); + taosMemoryFreeClear(pData->data); if (pData->type == SLOW_LOG_READ_BEGINNIG){ taosMemoryFree(pData->fileName); } } +static void monitorFreeSlowLogDataEx(void *paras) { + monitorFreeSlowLogData(paras); + taosMemoryFree(paras); +} + static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) { void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES); if(p == NULL){ @@ -98,12 +115,11 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { if(code != 0){ tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); } - if(monitorPutData2MonitorQueue(*p) == 0){ + MonitorSlowLogData tmp = {.clusterId = p->clusterId, .type = p->type, .fileName = p->fileName, + .pFile= p->pFile, .offset = p->offset, .data = NULL}; + if(monitorPutData2MonitorQueue(tmp) == 0){ p->fileName = NULL; - p->data = NULL; } - monitorFreeSlowLogData(p); - taosMemoryFree(p); } return code; } @@ -138,19 +154,15 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO pInfo->msgInfo.len = tlen; pInfo->msgType = TDMT_MND_STATIS; pInfo->param = param; + pInfo->paramFreeFp = monitorFreeSlowLogDataEx; pInfo->requestId = tGenIdPI64(); pInfo->requestObjRefId = 0; int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); - if (code == TSDB_CODE_SUCCESS) { - return code; - } + return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); FAILED: - tscError("sendReport failed, code:%d", code); - monitorFreeSlowLogData(param); - taosMemoryFree(param); + monitorFreeSlowLogDataEx(param); return -1; } @@ -439,6 +451,7 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64 MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); if(pParam == NULL){ taosMemoryFree(data); + taosMemoryFree(fileName); return -1; } pParam->data = data; @@ -450,18 +463,16 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64 return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam); } -static void monitorSendSlowLogAtBeginning(int64_t clusterId, char* fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){ - int64_t size = getFileSize(fileName); +static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){ + int64_t size = getFileSize(*fileName); if(size <= offset){ - taosFtruncateFile(pFile, 0); - taosUnLockFile(pFile); - taosCloseFile(&pFile); - taosRemoveFile(fileName); - tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", fileName); + processFileInTheEnd(pFile, *fileName); + tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); }else{ char* data = readFile(pFile, &offset, size); if(data != NULL){ - sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, taosStrdup(fileName), pTransporter, epSet); + sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, *fileName, pTransporter, epSet); + *fileName = NULL; } tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p, data:%s", pFile, data); } @@ -509,10 +520,8 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) { } int64_t size = getFileSize(pClient->path); if(size <= pClient->offset){ - taosFtruncateFile(pClient->pFile, 0); - taosUnLockFile(pClient->pFile); - taosCloseFile(&(pClient->pFile)); - taosRemoveFile(pClient->path); + processFileInTheEnd(pClient->pFile, pClient->path); + pClient->pFile = NULL; tscInfo("[monitor] monitorSendSlowLogAtQuit remove file:%s", pClient->path); if((--quitCnt) == 0){ return true; @@ -540,10 +549,8 @@ static void monitorSendAllSlowLogAtQuit(){ } int64_t size = getFileSize(pClient->path); if(size <= pClient->offset){ - taosFtruncateFile(pClient->pFile, 0); - taosUnLockFile(pClient->pFile); - taosCloseFile(&(pClient->pFile)); - taosRemoveFile(pClient->path); + processFileInTheEnd(pClient->pFile, pClient->path); + pClient->pFile = NULL; }else if(pClient->offset == 0){ int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); @@ -661,7 +668,9 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ continue; } SEpSet ep = getEpSet_s(&pInst->mgmtEp); - monitorSendSlowLogAtBeginning(pInst->clusterId, filename, pFile, 0, pInst->pTransporter, &ep); + char *tmp = taosStrdup(filename); + monitorSendSlowLogAtBeginning(pInst->clusterId, &tmp, pFile, 0, pInst->pTransporter, &ep); + taosMemoryFree(tmp); } taosCloseDir(&pDir); @@ -727,7 +736,7 @@ static void* monitorThreadFunc(void *param){ SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); if(pInst != NULL) { SEpSet ep = getEpSet_s(&pInst->mgmtEp); - monitorSendSlowLogAtBeginning(slowLogData->clusterId, slowLogData->fileName, slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep); + monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep); } }else{ monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);