fix:[TS-4921]refactor reporting logic for slow log
This commit is contained in:
parent
69f235d6b5
commit
a17ff3dbf5
|
@ -25,12 +25,19 @@ extern "C" {
|
|||
#include "query.h"
|
||||
#include "tqueue.h"
|
||||
|
||||
typedef enum SQL_RESULT_CODE {
|
||||
typedef enum {
|
||||
SQL_RESULT_SUCCESS = 0,
|
||||
SQL_RESULT_FAILED = 1,
|
||||
SQL_RESULT_CANCEL = 2,
|
||||
} SQL_RESULT_CODE;
|
||||
|
||||
typedef enum {
|
||||
SLOW_LOG_WRITE = 0,
|
||||
SLOW_LOG_READ_RUNNING = 1,
|
||||
SLOW_LOG_READ_BEGINNIG = 2,
|
||||
SLOW_LOG_READ_QUIT = 2,
|
||||
} SLOW_LOG_QUEUE_TYPE;
|
||||
|
||||
#define SLOW_LOG_SEND_SIZE 32*1024
|
||||
|
||||
typedef struct {
|
||||
|
@ -45,11 +52,18 @@ typedef struct {
|
|||
TdFilePtr pFile;
|
||||
int64_t lastCheckTime;
|
||||
char path[PATH_MAX];
|
||||
int64_t offset;
|
||||
} SlowLogClient;
|
||||
|
||||
typedef struct {
|
||||
int64_t clusterId;
|
||||
char *value;
|
||||
int64_t clusterId;
|
||||
SLOW_LOG_QUEUE_TYPE type;
|
||||
union{
|
||||
char* data;
|
||||
int64_t offset;
|
||||
};
|
||||
TdFilePtr pFile;
|
||||
char* fileName;
|
||||
} MonitorSlowLogData;
|
||||
|
||||
void monitorClose();
|
||||
|
@ -61,7 +75,7 @@ void monitorCreateClient(int64_t clusterId);
|
|||
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys);
|
||||
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values);
|
||||
const char* monitorResultStr(SQL_RESULT_CODE code);
|
||||
int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value);
|
||||
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -157,7 +157,8 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_
|
|||
}
|
||||
|
||||
char* value = cJSON_PrintUnformatted(json);
|
||||
if(monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, value) < 0){
|
||||
MonitorSlowLogData data = {.clusterId = pTscObj->pAppInfo->clusterId, .type = SLOW_LOG_WRITE, .data = value};
|
||||
if(monitorPutData2MonitorQueue(data) < 0){
|
||||
taosMemoryFree(value);
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ void* monitorTimer;
|
|||
SHashObj* monitorCounterHash;
|
||||
int32_t slowLogFlag = -1;
|
||||
int32_t monitorFlag = -1;
|
||||
int32_t quitCnt = 0;
|
||||
tsem2_t monitorSem;
|
||||
STaosQueue* monitorQueue;
|
||||
SHashObj* monitorSlowLogHash;
|
||||
|
@ -63,6 +64,19 @@ static void destroyMonitorClient(void* data){
|
|||
taosMemoryFree(pMonitor);
|
||||
}
|
||||
|
||||
static void monitorFreeSlowLogData(void *paras) {
|
||||
MonitorSlowLogData* pData = (MonitorSlowLogData*)paras;
|
||||
if (pData == NULL) {
|
||||
return;
|
||||
}
|
||||
if (pData->type == SLOW_LOG_WRITE){
|
||||
taosMemoryFree(pData->data);
|
||||
}
|
||||
if (pData->type == SLOW_LOG_READ_BEGINNIG){
|
||||
taosMemoryFree(pData->fileName);
|
||||
}
|
||||
}
|
||||
|
||||
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
|
||||
void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
|
||||
if(p == NULL){
|
||||
|
@ -72,11 +86,6 @@ static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
|
|||
return *(SAppInstInfo**)p;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
tsem_t sem;
|
||||
int32_t code;
|
||||
} SlowLogParam;
|
||||
|
||||
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
uError("found error in monitorReport send callback, code:%d, please check the network.", code);
|
||||
|
@ -86,9 +95,13 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
taosMemoryFree(pMsg->pEpSet);
|
||||
}
|
||||
if(param != NULL){
|
||||
SlowLogParam* p = (SlowLogParam*)param;
|
||||
p->code = code;
|
||||
tsem_post(&p->sem);
|
||||
MonitorSlowLogData* p = (MonitorSlowLogData*)param;
|
||||
if(code != 0){
|
||||
uError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId);
|
||||
}
|
||||
if(monitorPutData2MonitorQueue(*p) == 0){
|
||||
p->fileName = NULL;
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -100,11 +113,15 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
|
|||
sStatisReq.type = type;
|
||||
|
||||
int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq);
|
||||
if (tlen < 0) return 0;
|
||||
if (tlen < 0) {
|
||||
monitorFreeSlowLogData(param);
|
||||
return -1;
|
||||
}
|
||||
void* buf = taosMemoryMalloc(tlen);
|
||||
if (buf == NULL) {
|
||||
uError("sendReport failed, out of memory, len:%d", tlen);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
monitorFreeSlowLogData(param);
|
||||
return -1;
|
||||
}
|
||||
tSerializeSStatisReq(buf, tlen, &sStatisReq);
|
||||
|
@ -113,6 +130,8 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
|
|||
if (pInfo == NULL) {
|
||||
uError("sendReport failed, out of memory send info");
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
monitorFreeSlowLogData(param);
|
||||
taosMemoryFree(buf);
|
||||
return -1;
|
||||
}
|
||||
pInfo->fp = monitorReportAsyncCB;
|
||||
|
@ -120,7 +139,7 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
|
|||
pInfo->msgInfo.len = tlen;
|
||||
pInfo->msgType = TDMT_MND_STATIS;
|
||||
pInfo->param = param;
|
||||
// pInfo->paramFreeFp = taosMemoryFree;
|
||||
pInfo->paramFreeFp = monitorFreeSlowLogData;
|
||||
pInfo->requestId = tGenIdPI64();
|
||||
pInfo->requestObjRefId = 0;
|
||||
|
||||
|
@ -132,89 +151,6 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool monitorReadSendSlowLog(TdFilePtr pFile, char* path, void* pTransporter, SEpSet *epSet){
|
||||
int64_t filesize = 0;
|
||||
if (taosStatFile(path, &filesize, NULL, NULL) < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (filesize == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
char buf[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log
|
||||
char pCont[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log
|
||||
int32_t offset = 0;
|
||||
if(taosLSeekFile(pFile, 0, SEEK_SET) < 0){
|
||||
uError("failed to seek file:%p code: %d", pFile, errno);
|
||||
return false;
|
||||
}
|
||||
while(1){
|
||||
int64_t readSize = taosReadFile(pFile, buf + offset, SLOW_LOG_SEND_SIZE - offset);
|
||||
if (readSize <= 0) {
|
||||
if (readSize < 0){
|
||||
uError("failed to read len from file:%p since %s", pFile, terrstr());
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
memset(pCont, 0, sizeof(pCont));
|
||||
strcat(pCont, "[");
|
||||
char* string = buf;
|
||||
for(int i = 0; i < readSize + offset; i++){
|
||||
if (buf[i] == '\0') {
|
||||
if (string != buf) strcat(pCont, ",");
|
||||
strcat(pCont, string);
|
||||
uDebug("[monitor] monitorReadSendSlowLog slow log:%s", string);
|
||||
string = buf + i + 1;
|
||||
}
|
||||
}
|
||||
strcat(pCont, "]");
|
||||
if (pTransporter && pCont != NULL) {
|
||||
SlowLogParam* pParam = taosMemoryMalloc(sizeof(SlowLogParam));
|
||||
if (pParam == NULL) {
|
||||
return false;
|
||||
}
|
||||
if (tsem_init(&pParam->sem, 0, 0) != 0){
|
||||
taosMemoryFree(pParam);
|
||||
return false;
|
||||
}
|
||||
pParam->code = sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_SLOW_LOG, pParam);
|
||||
if(pParam->code == TSDB_CODE_SUCCESS){
|
||||
tsem_wait(&pParam->sem);
|
||||
}
|
||||
tsem_destroy(&pParam->sem);
|
||||
|
||||
if(pParam->code != TSDB_CODE_SUCCESS){
|
||||
if(taosLSeekFile(pFile, -readSize, SEEK_CUR) < 0){
|
||||
uError("failed to seek file:%p code: %d", pFile, errno);
|
||||
}
|
||||
uError("failed to send report:%s", pCont);
|
||||
taosMemoryFree(pParam);
|
||||
return false;
|
||||
}
|
||||
taosMemoryFree(pParam);
|
||||
|
||||
uDebug("[monitor] monitorReadSendSlowLog send slow log to mnode:%s", pCont)
|
||||
}
|
||||
|
||||
if (readSize + offset < SLOW_LOG_SEND_SIZE) {
|
||||
break;
|
||||
}
|
||||
offset = SLOW_LOG_SEND_SIZE - (string - buf);
|
||||
if(buf != string && offset != 0){
|
||||
memmove(buf, string, offset);
|
||||
uDebug("[monitor] monitorReadSendSlowLog left slow log:%s", buf)
|
||||
}
|
||||
}
|
||||
if(taosFtruncateFile(pFile, 0) < 0){
|
||||
uError("failed to truncate file:%p code: %d", pFile, errno);
|
||||
}
|
||||
uDebug("[monitor] monitorReadSendSlowLog send slow log file:%p", pFile);
|
||||
return true;
|
||||
}
|
||||
|
||||
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet *epSet) {
|
||||
char ts[50] = {0};
|
||||
sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
|
||||
|
@ -250,70 +186,6 @@ static void reportSendProcess(void* param, void* tmrId) {
|
|||
taosRUnLockLatch(&monitorLock);
|
||||
}
|
||||
|
||||
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
||||
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
|
||||
|
||||
if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){
|
||||
uInfo("[monitor] monitor is disabled, skip send slow log");
|
||||
return;
|
||||
}
|
||||
char namePrefix[PATH_MAX] = {0};
|
||||
if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) {
|
||||
uError("failed to generate slow log file name prefix");
|
||||
return;
|
||||
}
|
||||
|
||||
char tmpPath[PATH_MAX] = {0};
|
||||
if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
TdDirPtr pDir = taosOpenDir(tmpPath);
|
||||
if (pDir == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
TdDirEntryPtr de = NULL;
|
||||
while ((de = taosReadDir(pDir)) != NULL) {
|
||||
if (taosDirEntryIsDir(de)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char *name = taosGetDirEntryName(de);
|
||||
if (strcmp(name, ".") == 0 ||
|
||||
strcmp(name, "..") == 0 ||
|
||||
strstr(name, namePrefix) == NULL) {
|
||||
uInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId);
|
||||
continue;
|
||||
}
|
||||
|
||||
char filename[PATH_MAX] = {0};
|
||||
snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
|
||||
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
uError("failed to open file:%s since %s", filename, terrstr());
|
||||
continue;
|
||||
}
|
||||
if (taosLockFile(pFile) < 0) {
|
||||
uError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
|
||||
taosCloseFile(&pFile);
|
||||
continue;
|
||||
}
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
bool truncated = monitorReadSendSlowLog(pFile, filename, pInst->pTransporter, &ep);
|
||||
taosUnLockFile(pFile);
|
||||
taosCloseFile(&pFile);
|
||||
|
||||
if(truncated){
|
||||
taosRemoveFile(filename);
|
||||
}
|
||||
uDebug("[monitor] send and delete slow log file when reveive connect rsp:%s", filename);
|
||||
|
||||
}
|
||||
|
||||
taosCloseDir(&pDir);
|
||||
}
|
||||
|
||||
static void sendAllCounter(){
|
||||
MonitorClient** ppMonitor = (MonitorClient**)taosHashIterate(monitorCounterHash, NULL);
|
||||
while (ppMonitor != NULL) {
|
||||
|
@ -444,13 +316,6 @@ const char* monitorResultStr(SQL_RESULT_CODE code) {
|
|||
return result_state[code];
|
||||
}
|
||||
|
||||
static void monitorFreeSlowLogData(MonitorSlowLogData* pData) {
|
||||
if (pData == NULL) {
|
||||
return;
|
||||
}
|
||||
taosMemoryFree(pData->value);
|
||||
}
|
||||
|
||||
static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); }
|
||||
|
||||
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){
|
||||
|
@ -480,6 +345,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP
|
|||
}
|
||||
pClient->lastCheckTime = taosGetMonoTimestampMs();
|
||||
strcpy(pClient->path, path);
|
||||
pClient->offset = 0;
|
||||
pClient->pFile = pFile;
|
||||
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0){
|
||||
uError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
|
||||
|
@ -502,36 +368,236 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP
|
|||
pFile = (*(SlowLogClient**)tmp)->pFile;
|
||||
}
|
||||
|
||||
if (taosWriteFile(pFile, slowLogData->value, strlen(slowLogData->value) + 1) < 0){
|
||||
if(taosLSeekFile(pFile, 0, SEEK_END) < 0){
|
||||
uError("failed to seek file:%p code: %d", pFile, errno);
|
||||
return;
|
||||
}
|
||||
if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0){
|
||||
uError("failed to write len to file:%p since %s", pFile, terrstr());
|
||||
}
|
||||
uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId);
|
||||
}
|
||||
|
||||
static void monitorSendAllSlowLog(bool quit){
|
||||
int64_t t = taosGetMonoTimestampMs();
|
||||
static char* readFile(TdFilePtr pFile, int64_t *offset, bool* isEnd){
|
||||
if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){
|
||||
uError("failed to seek file:%p code: %d", pFile, errno);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int64_t totalSize = 0;
|
||||
char* pCont = taosMemoryCalloc(1, SLOW_LOG_SEND_SIZE);
|
||||
if(pCont == NULL){
|
||||
return NULL;
|
||||
}
|
||||
strcat(pCont, "[");
|
||||
|
||||
while(1) {
|
||||
char* pLine = NULL;
|
||||
int64_t readLen = taosGetLineFile(pFile, &pLine);
|
||||
|
||||
if(totalSize + readLen >= SLOW_LOG_SEND_SIZE){
|
||||
break;
|
||||
}
|
||||
if (readLen <= 0) {
|
||||
if (readLen < 0) {
|
||||
uError("failed to read len from file:%p since %s", pFile, terrstr());
|
||||
}else{
|
||||
*isEnd = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (totalSize != 0) strcat(pCont, ",");
|
||||
strcat(pCont, pLine);
|
||||
totalSize += readLen;
|
||||
}
|
||||
strcat(pCont, "]");
|
||||
uDebug("[monitor] monitorReadSendSlowLog slow log:%s", pCont);
|
||||
*offset += totalSize;
|
||||
return pCont;
|
||||
}
|
||||
|
||||
static bool isFileEmpty(char* path){
|
||||
int64_t filesize = 0;
|
||||
if (taosStatFile(path, &filesize, NULL, NULL) < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (filesize == 0) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){
|
||||
MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
|
||||
pParam->data = data;
|
||||
pParam->offset = offset;
|
||||
pParam->clusterId = clusterId;
|
||||
pParam->type = type;
|
||||
pParam->pFile = pFile;
|
||||
pParam->fileName = fileName;
|
||||
return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
|
||||
}
|
||||
|
||||
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char* fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){
|
||||
bool isEnd = false;
|
||||
char* data = readFile(pFile, &offset, &isEnd);
|
||||
if(isEnd){
|
||||
taosFtruncateFile(pFile, 0);
|
||||
taosUnLockFile(pFile);
|
||||
taosCloseFile(&pFile);
|
||||
taosRemoveFile(fileName);
|
||||
uDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", fileName);
|
||||
}else{
|
||||
sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, taosStrdup(fileName), pTransporter, epSet);
|
||||
uDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p", pFile);
|
||||
}
|
||||
}
|
||||
|
||||
static void monitorSendSlowLogAtRunning(int64_t clusterId){
|
||||
void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
|
||||
SlowLogClient* pClient = (*(SlowLogClient**)tmp);
|
||||
bool isEnd = false;
|
||||
char* data = readFile(pClient->pFile, &pClient->offset, &isEnd);
|
||||
if(isEnd){
|
||||
if(taosFtruncateFile(pClient->pFile, 0) < 0){
|
||||
uError("failed to truncate file:%p code: %d", pClient->pFile, errno);
|
||||
}
|
||||
pClient->offset = 0;
|
||||
}else if(data != NULL){
|
||||
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
||||
if(pInst == NULL){
|
||||
uError("failed to get app instance by clusterId:%" PRId64, clusterId);
|
||||
return;
|
||||
}
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
|
||||
uDebug("[monitor] monitorReadSendSlowLog send slow log:%s", data);
|
||||
}
|
||||
}
|
||||
|
||||
static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
|
||||
void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
|
||||
SlowLogClient* pClient = (*(SlowLogClient**)tmp);
|
||||
|
||||
bool isEnd = false;
|
||||
char* data = readFile(pClient->pFile, &pClient->offset, &isEnd);
|
||||
if(isEnd){
|
||||
taosUnLockFile(pClient->pFile);
|
||||
taosCloseFile(&(pClient->pFile));
|
||||
taosRemoveFile(pClient->path);
|
||||
if((--quitCnt) == 0){
|
||||
return true;
|
||||
}
|
||||
}else if(data != NULL){
|
||||
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
||||
if(pInst == NULL) {
|
||||
return true;
|
||||
}
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep);
|
||||
uDebug("[monitor] monitorReadSendSlowLog send slow log:%s", data);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
static void monitorSendAllSlowLogAtQuit(){
|
||||
void* pIter = NULL;
|
||||
while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
|
||||
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
|
||||
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId);
|
||||
bool truncated = false;
|
||||
if(quit || (pInst != NULL && t - (*(SlowLogClient**)pIter)->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000)) {
|
||||
(*(SlowLogClient**)pIter)->lastCheckTime = t;
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
truncated = monitorReadSendSlowLog((*(SlowLogClient**)pIter)->pFile, (*(SlowLogClient**)pIter)->path, pInst->pTransporter, &ep);
|
||||
}
|
||||
if(pInst == NULL) return;
|
||||
SlowLogClient* pClient = (*(SlowLogClient**)pIter);
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
bool isEnd = false;
|
||||
int64_t offset = 0;
|
||||
char* data = readFile(pClient->pFile, &offset, &isEnd);
|
||||
|
||||
if(quit){
|
||||
taosUnLockFile((*(SlowLogClient**)pIter)->pFile);
|
||||
taosCloseFile(&((*(SlowLogClient**)pIter)->pFile));
|
||||
if(truncated){
|
||||
taosRemoveFile((*(SlowLogClient**)pIter)->path);
|
||||
if(data != NULL && sendSlowLog(*clusterId, data, NULL, offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){
|
||||
quitCnt ++;
|
||||
}
|
||||
uDebug("[monitor] monitorSendAllSlowLogAtQuit send slow log :%s", data);
|
||||
}
|
||||
}
|
||||
|
||||
static void monitorSendAllSlowLog(){
|
||||
int64_t t = taosGetMonoTimestampMs();
|
||||
void* pIter = NULL;
|
||||
while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
|
||||
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
|
||||
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId);
|
||||
SlowLogClient* pClient = (*(SlowLogClient**)pIter);
|
||||
if (pInst != NULL && t - pClient->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000 &&
|
||||
pClient->offset == 0 && !isFileEmpty(pClient->path)) {
|
||||
pClient->lastCheckTime = t;
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
bool isEnd = false;
|
||||
int64_t offset = 0;
|
||||
char* data = readFile(pClient->pFile, &offset, &isEnd);
|
||||
if(data){
|
||||
sendSlowLog(*clusterId, data, NULL, offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
|
||||
}
|
||||
uDebug("[monitor] monitorSendAllSlowLog send slow log :%s", data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
||||
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
|
||||
|
||||
if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){
|
||||
uInfo("[monitor] monitor is disabled, skip send slow log");
|
||||
return;
|
||||
}
|
||||
char namePrefix[PATH_MAX] = {0};
|
||||
if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) {
|
||||
uError("failed to generate slow log file name prefix");
|
||||
return;
|
||||
}
|
||||
|
||||
char tmpPath[PATH_MAX] = {0};
|
||||
if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
TdDirPtr pDir = taosOpenDir(tmpPath);
|
||||
if (pDir == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
TdDirEntryPtr de = NULL;
|
||||
while ((de = taosReadDir(pDir)) != NULL) {
|
||||
if (taosDirEntryIsDir(de)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char *name = taosGetDirEntryName(de);
|
||||
if (strcmp(name, ".") == 0 ||
|
||||
strcmp(name, "..") == 0 ||
|
||||
strstr(name, namePrefix) == NULL) {
|
||||
uInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId);
|
||||
continue;
|
||||
}
|
||||
|
||||
char filename[PATH_MAX] = {0};
|
||||
snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
|
||||
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
uError("failed to open file:%s since %s", filename, terrstr());
|
||||
continue;
|
||||
}
|
||||
if (taosLockFile(pFile) < 0) {
|
||||
uError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
|
||||
taosCloseFile(&pFile);
|
||||
continue;
|
||||
}
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
monitorSendSlowLogAtBeginning(pInst->clusterId, filename, pFile, 0, pInst->pTransporter, &ep);
|
||||
}
|
||||
|
||||
taosCloseDir(&pDir);
|
||||
}
|
||||
|
||||
static void* monitorThreadFunc(void *param){
|
||||
setThreadName("client-monitor-slowlog");
|
||||
|
||||
|
@ -567,26 +633,45 @@ static void* monitorThreadFunc(void *param){
|
|||
return NULL;
|
||||
}
|
||||
uDebug("monitorThreadFunc start");
|
||||
int64_t quitTime = 0;
|
||||
while (1) {
|
||||
if (slowLogFlag > 0) {
|
||||
monitorSendAllSlowLog(true);
|
||||
break;
|
||||
if(quitCnt == 0){
|
||||
monitorSendAllSlowLogAtQuit();
|
||||
quitTime = taosGetMonoTimestampMs();
|
||||
}
|
||||
if(taosGetMonoTimestampMs() - quitTime > 500){ //quit at most 500ms
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
MonitorSlowLogData* slowLogData = NULL;
|
||||
taosReadQitem(monitorQueue, (void**)&slowLogData);
|
||||
if (slowLogData != NULL) {
|
||||
uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value);
|
||||
if (slowLogData->value == NULL){
|
||||
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
|
||||
} else{
|
||||
if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){
|
||||
if(slowLogData->pFile != NULL){
|
||||
SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId);
|
||||
if(pInst != NULL) {
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
monitorSendSlowLogAtBeginning(slowLogData->clusterId, slowLogData->fileName, slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep);
|
||||
}
|
||||
}else{
|
||||
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
|
||||
}
|
||||
} else if(slowLogData->type == SLOW_LOG_WRITE){
|
||||
monitorWriteSlowLog2File(slowLogData, tmpPath);
|
||||
} else if(slowLogData->type == SLOW_LOG_READ_RUNNING){
|
||||
monitorSendSlowLogAtRunning(slowLogData->clusterId);
|
||||
}else if(slowLogData->type == SLOW_LOG_READ_QUIT){
|
||||
if(monitorSendSlowLogAtQuit(slowLogData->clusterId)){
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
monitorFreeSlowLogData(slowLogData);
|
||||
taosFreeQitem(slowLogData);
|
||||
|
||||
monitorSendAllSlowLog(false);
|
||||
monitorSendAllSlowLog();
|
||||
tsem2_timewait(&monitorSem, 100);
|
||||
}
|
||||
|
||||
|
@ -659,15 +744,14 @@ void monitorClose() {
|
|||
taosWUnLockLatch(&monitorLock);
|
||||
}
|
||||
|
||||
int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value){
|
||||
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){
|
||||
MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0);
|
||||
if (slowLogData == NULL) {
|
||||
uError("[monitor] failed to allocate slow log data");
|
||||
return -1;
|
||||
}
|
||||
slowLogData->clusterId = clusterId;
|
||||
slowLogData->value = value;
|
||||
uDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value);
|
||||
*slowLogData = data;
|
||||
uDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type);
|
||||
while (atomic_load_32(&slowLogFlag) == -1) {
|
||||
taosMsleep(5);
|
||||
}
|
||||
|
|
|
@ -155,7 +155,8 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){
|
||||
tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
|
||||
}
|
||||
monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, NULL);
|
||||
MonitorSlowLogData data ={.clusterId = pTscObj->pAppInfo->clusterId, .type = SLOW_LOG_READ_BEGINNIG, .pFile = NULL};
|
||||
monitorPutData2MonitorQueue(data);
|
||||
monitorClientSlowQueryInit(connectRsp.clusterId);
|
||||
monitorClientSQLReqInit(connectRsp.clusterId);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue