fix:[TS-4921]refactor reporting logic for slow log

This commit is contained in:
wangmm0220 2024-07-03 15:21:56 +08:00
parent 8805650be5
commit cfbd475fc2
2 changed files with 51 additions and 44 deletions

View File

@ -58,10 +58,8 @@ typedef struct {
typedef struct { typedef struct {
int64_t clusterId; int64_t clusterId;
SLOW_LOG_QUEUE_TYPE type; SLOW_LOG_QUEUE_TYPE type;
union{ char* data;
char* data; int64_t offset;
int64_t offset;
};
TdFilePtr pFile; TdFilePtr pFile;
char* fileName; char* fileName;
} MonitorSlowLogData; } MonitorSlowLogData;

View File

@ -31,22 +31,34 @@ static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){
return 0; return 0;
} }
//static void destroyCounter(void* data){ static void processFileInTheEnd(TdFilePtr pFile, char* path){
// if (data == NULL) { if(pFile == NULL){
// return; return;
// } }
// taos_counter_t* conuter = *(taos_counter_t**)data; if(taosFtruncateFile(pFile, 0) != 0){
// if(conuter == NULL){ tscError("failed to truncate file:%s, errno:%d", path, errno);
// return; return;
// } }
// taos_counter_destroy(conuter); if(taosUnLockFile(pFile) != 0){
//} tscError("failed to unlock file:%s, errno:%d", path, errno);
return;
}
if(taosCloseFile(&(pFile)) != 0){
tscError("failed to close file:%s, errno:%d", path, errno);
return;
}
if(taosRemoveFile(path) != 0){
tscError("failed to remove file:%s, errno:%d", path, errno);
return;
}
}
static void destroySlowLogClient(void* data){ static void destroySlowLogClient(void* data){
if (data == NULL) { if (data == NULL) {
return; return;
} }
SlowLogClient* slowLogClient = *(SlowLogClient**)data; SlowLogClient* slowLogClient = *(SlowLogClient**)data;
processFileInTheEnd(slowLogClient->pFile, slowLogClient->path);
taosMemoryFree(slowLogClient); taosMemoryFree(slowLogClient);
} }
@ -70,12 +82,17 @@ static void monitorFreeSlowLogData(void *paras) {
if (pData == NULL) { if (pData == NULL) {
return; return;
} }
taosMemoryFree(pData->data); taosMemoryFreeClear(pData->data);
if (pData->type == SLOW_LOG_READ_BEGINNIG){ if (pData->type == SLOW_LOG_READ_BEGINNIG){
taosMemoryFree(pData->fileName); taosMemoryFree(pData->fileName);
} }
} }
static void monitorFreeSlowLogDataEx(void *paras) {
monitorFreeSlowLogData(paras);
taosMemoryFree(paras);
}
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) { static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES); void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
if(p == NULL){ if(p == NULL){
@ -98,12 +115,11 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
if(code != 0){ if(code != 0){
tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId);
} }
if(monitorPutData2MonitorQueue(*p) == 0){ MonitorSlowLogData tmp = {.clusterId = p->clusterId, .type = p->type, .fileName = p->fileName,
.pFile= p->pFile, .offset = p->offset, .data = NULL};
if(monitorPutData2MonitorQueue(tmp) == 0){
p->fileName = NULL; p->fileName = NULL;
p->data = NULL;
} }
monitorFreeSlowLogData(p);
taosMemoryFree(p);
} }
return code; return code;
} }
@ -138,19 +154,15 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
pInfo->msgInfo.len = tlen; pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_STATIS; pInfo->msgType = TDMT_MND_STATIS;
pInfo->param = param; pInfo->param = param;
pInfo->paramFreeFp = monitorFreeSlowLogDataEx;
pInfo->requestId = tGenIdPI64(); pInfo->requestId = tGenIdPI64();
pInfo->requestObjRefId = 0; pInfo->requestObjRefId = 0;
int64_t transporterId = 0; int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo);
if (code == TSDB_CODE_SUCCESS) {
return code;
}
FAILED: FAILED:
tscError("sendReport failed, code:%d", code); monitorFreeSlowLogDataEx(param);
monitorFreeSlowLogData(param);
taosMemoryFree(param);
return -1; return -1;
} }
@ -439,6 +451,7 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64
MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
if(pParam == NULL){ if(pParam == NULL){
taosMemoryFree(data); taosMemoryFree(data);
taosMemoryFree(fileName);
return -1; return -1;
} }
pParam->data = data; pParam->data = data;
@ -450,18 +463,16 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64
return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam); return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
} }
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char* fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){
int64_t size = getFileSize(fileName); int64_t size = getFileSize(*fileName);
if(size <= offset){ if(size <= offset){
taosFtruncateFile(pFile, 0); processFileInTheEnd(pFile, *fileName);
taosUnLockFile(pFile); tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName);
taosCloseFile(&pFile);
taosRemoveFile(fileName);
tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", fileName);
}else{ }else{
char* data = readFile(pFile, &offset, size); char* data = readFile(pFile, &offset, size);
if(data != NULL){ if(data != NULL){
sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, taosStrdup(fileName), pTransporter, epSet); sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, *fileName, pTransporter, epSet);
*fileName = NULL;
} }
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p, data:%s", pFile, data); tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p, data:%s", pFile, data);
} }
@ -509,10 +520,8 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
} }
int64_t size = getFileSize(pClient->path); int64_t size = getFileSize(pClient->path);
if(size <= pClient->offset){ if(size <= pClient->offset){
taosFtruncateFile(pClient->pFile, 0); processFileInTheEnd(pClient->pFile, pClient->path);
taosUnLockFile(pClient->pFile); pClient->pFile = NULL;
taosCloseFile(&(pClient->pFile));
taosRemoveFile(pClient->path);
tscInfo("[monitor] monitorSendSlowLogAtQuit remove file:%s", pClient->path); tscInfo("[monitor] monitorSendSlowLogAtQuit remove file:%s", pClient->path);
if((--quitCnt) == 0){ if((--quitCnt) == 0){
return true; return true;
@ -540,10 +549,8 @@ static void monitorSendAllSlowLogAtQuit(){
} }
int64_t size = getFileSize(pClient->path); int64_t size = getFileSize(pClient->path);
if(size <= pClient->offset){ if(size <= pClient->offset){
taosFtruncateFile(pClient->pFile, 0); processFileInTheEnd(pClient->pFile, pClient->path);
taosUnLockFile(pClient->pFile); pClient->pFile = NULL;
taosCloseFile(&(pClient->pFile));
taosRemoveFile(pClient->path);
}else if(pClient->offset == 0){ }else if(pClient->offset == 0){
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); SAppInstInfo* pInst = getAppInstByClusterId(*clusterId);
@ -661,7 +668,9 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
continue; continue;
} }
SEpSet ep = getEpSet_s(&pInst->mgmtEp); SEpSet ep = getEpSet_s(&pInst->mgmtEp);
monitorSendSlowLogAtBeginning(pInst->clusterId, filename, pFile, 0, pInst->pTransporter, &ep); char *tmp = taosStrdup(filename);
monitorSendSlowLogAtBeginning(pInst->clusterId, &tmp, pFile, 0, pInst->pTransporter, &ep);
taosMemoryFree(tmp);
} }
taosCloseDir(&pDir); taosCloseDir(&pDir);
@ -727,7 +736,7 @@ static void* monitorThreadFunc(void *param){
SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId);
if(pInst != NULL) { if(pInst != NULL) {
SEpSet ep = getEpSet_s(&pInst->mgmtEp); SEpSet ep = getEpSet_s(&pInst->mgmtEp);
monitorSendSlowLogAtBeginning(slowLogData->clusterId, slowLogData->fileName, slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep); monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep);
} }
}else{ }else{
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);