diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 5e89c07b2b..aa6ee6c9fc 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -38,7 +38,7 @@ typedef enum { SLOW_LOG_READ_QUIT = 2, } SLOW_LOG_QUEUE_TYPE; -#define SLOW_LOG_SEND_SIZE 32*1024 +#define SLOW_LOG_SEND_SIZE_MAX 128*1024*1024 typedef struct { int64_t clusterId; diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index b30224c510..94b1fa9388 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -187,19 +187,19 @@ static void reportSendProcess(void* param, void* tmrId) { } static void sendAllCounter(){ - MonitorClient** ppMonitor = (MonitorClient**)taosHashIterate(monitorCounterHash, NULL); - while (ppMonitor != NULL) { + MonitorClient** ppMonitor = NULL; + while ((ppMonitor = taosHashIterate(monitorSlowLogHash, ppMonitor))) { MonitorClient* pMonitor = *ppMonitor; - if (pMonitor != NULL){ - SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); - if(pInst == NULL){ - taosHashCancelIterate(monitorCounterHash, ppMonitor); - break; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); + if (pMonitor == NULL){ + continue; } - ppMonitor = taosHashIterate(monitorCounterHash, ppMonitor); + SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); + if(pInst == NULL){ + taosHashCancelIterate(monitorCounterHash, ppMonitor); + break; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); } } @@ -330,7 +330,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP } taosGetTmpfilePath(tmpPath, clusterId, path); uInfo("[monitor] create slow log file:%s", path); - pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_STREAM); + pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to open file:%s since %s", path, terrstr()); @@ -378,63 +378,70 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId); } -static char* readFileByLine(TdFilePtr pFile, int64_t *offset, bool* isEnd){ +static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){ if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){ uError("failed to seek file:%p code: %d", pFile, errno); return NULL; } + ASSERT(size > *offset); + char* pCont = NULL; int64_t totalSize = 0; - char* pCont = taosMemoryCalloc(1, SLOW_LOG_SEND_SIZE); + if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) { + pCont = taosMemoryCalloc(1, 2 * SLOW_LOG_SEND_SIZE_MAX); + totalSize = 2 * SLOW_LOG_SEND_SIZE_MAX; + }else{ + pCont = taosMemoryCalloc(1, 2 * (size - *offset)); + totalSize = 2 * (size - *offset); + } + if(pCont == NULL){ + uError("failed to allocate memory for slow log, size:%" PRId64, totalSize); return NULL; } - strcat(pCont, "["); - - while(1) { - char* pLine = NULL; - int64_t readLen = taosGetLineFile(pFile, &pLine); - - if(totalSize + readLen >= SLOW_LOG_SEND_SIZE){ - break; + char* buf = pCont; + strcat(buf++, "["); + int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX); + if (readSize <= 0) { + if (readSize < 0){ + uError("failed to read len from file:%p since %s", pFile, terrstr()); } - if (readLen <= 0) { - if (readLen < 0) { - uError("failed to read len from file:%p since %s", pFile, terrstr()); - }else if(totalSize == 0){ - *isEnd = true; - } - break; - } - - if (totalSize != 0) strcat(pCont, ","); - strcat(pCont, pLine); - totalSize += readLen; - } - strcat(pCont, "]"); - uDebug("[monitor] monitorReadSendSlowLog slow log:%s", pCont); - *offset += totalSize; - if(*isEnd){ taosMemoryFree(pCont); return NULL; } + + totalSize = 0; + while(1){ + size_t len = strlen(buf); + totalSize += (len+1); + if (totalSize > readSize || len == 0) { + *(buf-1) = ']'; + *buf = '\0'; + break; + } + buf[len] = ','; // replace '\0' with ',' + buf += (len + 1); + *offset += (len+1); + } + + uDebug("[monitor] monitorReadSendSlowLog slow log:%s", pCont); return pCont; } -static bool isFileEmpty(char* path){ - int64_t filesize = 0; - if (taosStatFile(path, &filesize, NULL, NULL) < 0) { - return false; +static int64_t getFileSize(char* path){ + int64_t fileSize = 0; + if (taosStatFile(path, &fileSize, NULL, NULL) < 0) { + return -1; } - if (filesize == 0) { - return true; - } - return false; + return fileSize; } static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){ MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); + if(pParam == NULL){ + return -1; + } pParam->data = data; pParam->offset = offset; pParam->clusterId = clusterId; @@ -445,15 +452,15 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64 } static void monitorSendSlowLogAtBeginning(int64_t clusterId, char* fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){ - bool isEnd = false; - char* data = readFileByLine(pFile, &offset, &isEnd); - if(isEnd){ + int64_t size = getFileSize(fileName); + if(size <= offset){ taosFtruncateFile(pFile, 0); taosUnLockFile(pFile); taosCloseFile(&pFile); taosRemoveFile(fileName); uDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", fileName); }else{ + char* data = readFile(pFile, &offset, size); if(data != NULL){ sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, taosStrdup(fileName), pTransporter, epSet); } @@ -463,46 +470,61 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char* fileName, TdF static void monitorSendSlowLogAtRunning(int64_t clusterId){ void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); + if (tmp == NULL){ + return; + } SlowLogClient* pClient = (*(SlowLogClient**)tmp); - bool isEnd = false; - char* data = readFileByLine(pClient->pFile, &pClient->offset, &isEnd); - if(isEnd){ + if (pClient == NULL){ + return; + } + int64_t size = getFileSize(pClient->path); + if(size <= pClient->offset){ if(taosFtruncateFile(pClient->pFile, 0) < 0){ uError("failed to truncate file:%p code: %d", pClient->pFile, errno); } pClient->offset = 0; - }else if(data != NULL){ + }else{ SAppInstInfo* pInst = getAppInstByClusterId(clusterId); if(pInst == NULL){ uError("failed to get app instance by clusterId:%" PRId64, clusterId); return; } SEpSet ep = getEpSet_s(&pInst->mgmtEp); - sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); + char* data = readFile(pClient->pFile, &pClient->offset, size); + if(data != NULL){ + sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); + } uDebug("[monitor] monitorReadSendSlowLog send slow log:%s", data); } } static bool monitorSendSlowLogAtQuit(int64_t clusterId) { void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); + if (tmp == NULL){ + return true; + } SlowLogClient* pClient = (*(SlowLogClient**)tmp); - - bool isEnd = false; - char* data = readFileByLine(pClient->pFile, &pClient->offset, &isEnd); - if(isEnd){ + if (pClient == NULL){ + return true; + } + int64_t size = getFileSize(pClient->path); + if(size <= pClient->offset){ taosUnLockFile(pClient->pFile); taosCloseFile(&(pClient->pFile)); taosRemoveFile(pClient->path); if((--quitCnt) == 0){ return true; } - }else if(data != NULL){ + }else{ SAppInstInfo* pInst = getAppInstByClusterId(clusterId); if(pInst == NULL) { return true; } SEpSet ep = getEpSet_s(&pInst->mgmtEp); - sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep); + char* data = readFile(pClient->pFile, &pClient->offset, size); + if(data != NULL){ + sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep); + } uDebug("[monitor] monitorReadSendSlowLog send slow log:%s", data); } return false; @@ -510,14 +532,21 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) { static void monitorSendAllSlowLogAtQuit(){ void* pIter = NULL; while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) { + SlowLogClient* pClient = (*(SlowLogClient**)pIter); + if(pClient == NULL) { + taosHashCancelIterate(monitorSlowLogHash, pIter); + return; + } int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); - if(pInst == NULL) return; - SlowLogClient* pClient = (*(SlowLogClient**)pIter); + if(pInst == NULL) { + taosHashCancelIterate(monitorSlowLogHash, pIter); + return; + } SEpSet ep = getEpSet_s(&pInst->mgmtEp); - bool isEnd = false; int64_t offset = 0; - char* data = readFileByLine(pClient->pFile, &offset, &isEnd); + int64_t size = getFileSize(pClient->path); + char* data = readFile(pClient->pFile, &offset, size); if(data != NULL && sendSlowLog(*clusterId, data, NULL, offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){ quitCnt ++; } @@ -532,13 +561,20 @@ static void monitorSendAllSlowLog(){ int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); SlowLogClient* pClient = (*(SlowLogClient**)pIter); + if (pClient == NULL){ + taosHashCancelIterate(monitorSlowLogHash, pIter); + return; + } if (pInst != NULL && t - pClient->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000 && - pClient->offset == 0 && !isFileEmpty(pClient->path)) { + pClient->offset == 0) { + int64_t size = getFileSize(pClient->path); + if(size <= 0){ + continue; + } pClient->lastCheckTime = t; SEpSet ep = getEpSet_s(&pInst->mgmtEp); - bool isEnd = false; int64_t offset = 0; - char* data = readFileByLine(pClient->pFile, &offset, &isEnd); + char* data = readFile(pClient->pFile, &offset, size); if(data != NULL){ sendSlowLog(*clusterId, data, NULL, offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); } @@ -586,7 +622,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ char filename[PATH_MAX] = {0}; snprintf(filename, sizeof(filename), "%s%s", tmpPath, name); - TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_STREAM | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_TRUNC); if (pFile == NULL) { uError("failed to open file:%s since %s", filename, terrstr()); continue; diff --git a/source/client/test/clientMonitorTests.cpp b/source/client/test/clientMonitorTests.cpp index 2d3ce87f38..6285be5a4c 100644 --- a/source/client/test/clientMonitorTests.cpp +++ b/source/client/test/clientMonitorTests.cpp @@ -64,6 +64,65 @@ int main(int argc, char** argv) { // clusterMonitorClose(cluster2); //} +static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){ + if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){ + uError("failed to seek file:%p code: %d", pFile, errno); + return NULL; + } + + ASSERT(size > *offset); + char* pCont = NULL; + int64_t totalSize = 0; + if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) { + pCont = (char*)taosMemoryCalloc(1, 2 * SLOW_LOG_SEND_SIZE_MAX); + totalSize = 2 * SLOW_LOG_SEND_SIZE_MAX; + }else{ + pCont = (char*)taosMemoryCalloc(1, 2 * (size - *offset)); + totalSize = 2 * (size - *offset); + } + + if(pCont == NULL){ + uError("failed to allocate memory for slow log, size:%" PRId64, totalSize); + return NULL; + } + char* buf = pCont; + strcat(buf++, "["); + int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX); + if (readSize <= 0) { + if (readSize < 0){ + uError("failed to read len from file:%p since %s", pFile, terrstr()); + } + taosMemoryFree(pCont); + return NULL; + } + + totalSize = 0; + while(1){ + size_t len = strlen(buf); + totalSize += (len+1); + if (totalSize > readSize || len == 0) { + *(buf-1) = ']'; + *buf = '\0'; + break; + } + buf[len] = ','; // replace '\0' with ',' + buf += (len + 1); + *offset += (len+1); + } + + uDebug("[monitor] monitorReadSendSlowLog slow log:%s", pCont); + return pCont; +} + +static int64_t getFileSize(char* path){ + int64_t fileSize = 0; + if (taosStatFile(path, &fileSize, NULL, NULL) < 0) { + return -1; + } + + return fileSize; +} + TEST(clientMonitorTest, sendTest) { TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); ASSERT_TRUE(taos != NULL); @@ -91,8 +150,8 @@ TEST(clientMonitorTest, ReadOneFile) { } const int batch = 10; - const int size = SLOW_LOG_SEND_SIZE/batch; - for(int i = 0; i < batch + 1; i++){ + const int size = 10; + for(int i = 0; i < batch; i++){ char value[size] = {0}; memset(value, '0' + i, size - 1); if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ @@ -106,64 +165,72 @@ TEST(clientMonitorTest, ReadOneFile) { // Create an SEpSet object and set it up for testing SEpSet* epSet = NULL; + int64_t fileSize = getFileSize("./tdengine-1-wewe"); // Call the function to be tested -// monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet); - - char value[size] = {0}; - memset(value, '0', size - 1); - if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ - uError("failed to write len to file:%p since %s", pFile, terrstr()); + int64_t offset = 0; + while(1){ + if (offset >= fileSize) { + break; + } + char* val = readFile(pFile, &offset, fileSize); + printf("offset:%" PRId64",fileSize:%"PRId64",val:%s\n", offset, fileSize, val); } +// char value[size] = {0}; +// memset(value, '0', size - 1); +// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ +// uError("failed to write len to file:%p since %s", pFile, terrstr()); +// } + // monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet); // Clean up any resources created for testing taosCloseFile(&pFile); } -TEST(clientMonitorTest, ReadTwoFile) { - // Create a TdFilePtr object and set it up for testing - - TdFilePtr pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); - if (pFile == NULL) { - uError("failed to open file:./test.txt since %s", terrstr()); - return; - } - - const int batch = 10; - const int size = SLOW_LOG_SEND_SIZE/batch; - for(int i = 0; i < batch + 1; i++){ - char value[size] = {0}; - memset(value, '0' + i, size - 1); - if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ - uError("failed to write len to file:%p since %s", pFile, terrstr()); - } - } - - taosFsyncFile(pFile); - taosCloseFile(&pFile); - - pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-2-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); - if (pFile == NULL) { - uError("failed to open file:./test.txt since %s", terrstr()); - return; - } - - for(int i = 0; i < batch + 1; i++){ - char value[size] = {0}; - memset(value, '0' + i, size - 1); - if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ - uError("failed to write len to file:%p since %s", pFile, terrstr()); - } - } - - taosFsyncFile(pFile); - taosCloseFile(&pFile); - - SAppInstInfo pAppInfo = {0}; - pAppInfo.clusterId = 2; - pAppInfo.monitorParas.tsEnableMonitor = 1; - strcpy(tsTempDir,"/tmp"); -// monitorSendAllSlowLogFromTempDir(&pAppInfo); - -} \ No newline at end of file +//TEST(clientMonitorTest, ReadTwoFile) { +// // Create a TdFilePtr object and set it up for testing +// +// TdFilePtr pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); +// if (pFile == NULL) { +// uError("failed to open file:./test.txt since %s", terrstr()); +// return; +// } +// +// const int batch = 10; +// const int size = SLOW_LOG_SEND_SIZE/batch; +// for(int i = 0; i < batch + 1; i++){ +// char value[size] = {0}; +// memset(value, '0' + i, size - 1); +// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ +// uError("failed to write len to file:%p since %s", pFile, terrstr()); +// } +// } +// +// taosFsyncFile(pFile); +// taosCloseFile(&pFile); +// +// pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-2-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); +// if (pFile == NULL) { +// uError("failed to open file:./test.txt since %s", terrstr()); +// return; +// } +// +// for(int i = 0; i < batch + 1; i++){ +// char value[size] = {0}; +// memset(value, '0' + i, size - 1); +// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){ +// uError("failed to write len to file:%p since %s", pFile, terrstr()); +// } +// } +// +// taosFsyncFile(pFile); +// taosCloseFile(&pFile); +// +// SAppInstInfo pAppInfo = {0}; +// pAppInfo.clusterId = 2; +// pAppInfo.monitorParas.tsEnableMonitor = 1; +// strcpy(tsTempDir,"/tmp"); +//// monitorSendAllSlowLogFromTempDir(&pAppInfo); +// +//} \ No newline at end of file