fix:[TS-4921]refactor reporting logic for slow log

This commit is contained in:
wangmm0220 2024-07-02 14:52:20 +08:00
parent 0f0fa840ea
commit 65696ce97f
3 changed files with 227 additions and 124 deletions

View File

@ -38,7 +38,7 @@ typedef enum {
SLOW_LOG_READ_QUIT = 2,
} SLOW_LOG_QUEUE_TYPE;
#define SLOW_LOG_SEND_SIZE 32*1024
#define SLOW_LOG_SEND_SIZE_MAX 128*1024*1024
typedef struct {
int64_t clusterId;

View File

@ -187,19 +187,19 @@ static void reportSendProcess(void* param, void* tmrId) {
}
static void sendAllCounter(){
MonitorClient** ppMonitor = (MonitorClient**)taosHashIterate(monitorCounterHash, NULL);
while (ppMonitor != NULL) {
MonitorClient** ppMonitor = NULL;
while ((ppMonitor = taosHashIterate(monitorSlowLogHash, ppMonitor))) {
MonitorClient* pMonitor = *ppMonitor;
if (pMonitor != NULL){
SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
if(pInst == NULL){
taosHashCancelIterate(monitorCounterHash, ppMonitor);
break;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
if (pMonitor == NULL){
continue;
}
ppMonitor = taosHashIterate(monitorCounterHash, ppMonitor);
SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
if(pInst == NULL){
taosHashCancelIterate(monitorCounterHash, ppMonitor);
break;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
}
}
@ -330,7 +330,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP
}
taosGetTmpfilePath(tmpPath, clusterId, path);
uInfo("[monitor] create slow log file:%s", path);
pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_STREAM);
pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to open file:%s since %s", path, terrstr());
@ -378,63 +378,70 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP
uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId);
}
static char* readFileByLine(TdFilePtr pFile, int64_t *offset, bool* isEnd){
static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){
if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){
uError("failed to seek file:%p code: %d", pFile, errno);
return NULL;
}
ASSERT(size > *offset);
char* pCont = NULL;
int64_t totalSize = 0;
char* pCont = taosMemoryCalloc(1, SLOW_LOG_SEND_SIZE);
if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) {
pCont = taosMemoryCalloc(1, 2 * SLOW_LOG_SEND_SIZE_MAX);
totalSize = 2 * SLOW_LOG_SEND_SIZE_MAX;
}else{
pCont = taosMemoryCalloc(1, 2 * (size - *offset));
totalSize = 2 * (size - *offset);
}
if(pCont == NULL){
uError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
return NULL;
}
strcat(pCont, "[");
while(1) {
char* pLine = NULL;
int64_t readLen = taosGetLineFile(pFile, &pLine);
if(totalSize + readLen >= SLOW_LOG_SEND_SIZE){
break;
char* buf = pCont;
strcat(buf++, "[");
int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX);
if (readSize <= 0) {
if (readSize < 0){
uError("failed to read len from file:%p since %s", pFile, terrstr());
}
if (readLen <= 0) {
if (readLen < 0) {
uError("failed to read len from file:%p since %s", pFile, terrstr());
}else if(totalSize == 0){
*isEnd = true;
}
break;
}
if (totalSize != 0) strcat(pCont, ",");
strcat(pCont, pLine);
totalSize += readLen;
}
strcat(pCont, "]");
uDebug("[monitor] monitorReadSendSlowLog slow log:%s", pCont);
*offset += totalSize;
if(*isEnd){
taosMemoryFree(pCont);
return NULL;
}
totalSize = 0;
while(1){
size_t len = strlen(buf);
totalSize += (len+1);
if (totalSize > readSize || len == 0) {
*(buf-1) = ']';
*buf = '\0';
break;
}
buf[len] = ','; // replace '\0' with ','
buf += (len + 1);
*offset += (len+1);
}
uDebug("[monitor] monitorReadSendSlowLog slow log:%s", pCont);
return pCont;
}
static bool isFileEmpty(char* path){
int64_t filesize = 0;
if (taosStatFile(path, &filesize, NULL, NULL) < 0) {
return false;
static int64_t getFileSize(char* path){
int64_t fileSize = 0;
if (taosStatFile(path, &fileSize, NULL, NULL) < 0) {
return -1;
}
if (filesize == 0) {
return true;
}
return false;
return fileSize;
}
static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){
MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
if(pParam == NULL){
return -1;
}
pParam->data = data;
pParam->offset = offset;
pParam->clusterId = clusterId;
@ -445,15 +452,15 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64
}
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char* fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){
bool isEnd = false;
char* data = readFileByLine(pFile, &offset, &isEnd);
if(isEnd){
int64_t size = getFileSize(fileName);
if(size <= offset){
taosFtruncateFile(pFile, 0);
taosUnLockFile(pFile);
taosCloseFile(&pFile);
taosRemoveFile(fileName);
uDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", fileName);
}else{
char* data = readFile(pFile, &offset, size);
if(data != NULL){
sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, taosStrdup(fileName), pTransporter, epSet);
}
@ -463,46 +470,61 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char* fileName, TdF
static void monitorSendSlowLogAtRunning(int64_t clusterId){
void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
if (tmp == NULL){
return;
}
SlowLogClient* pClient = (*(SlowLogClient**)tmp);
bool isEnd = false;
char* data = readFileByLine(pClient->pFile, &pClient->offset, &isEnd);
if(isEnd){
if (pClient == NULL){
return;
}
int64_t size = getFileSize(pClient->path);
if(size <= pClient->offset){
if(taosFtruncateFile(pClient->pFile, 0) < 0){
uError("failed to truncate file:%p code: %d", pClient->pFile, errno);
}
pClient->offset = 0;
}else if(data != NULL){
}else{
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
if(pInst == NULL){
uError("failed to get app instance by clusterId:%" PRId64, clusterId);
return;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
char* data = readFile(pClient->pFile, &pClient->offset, size);
if(data != NULL){
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
}
uDebug("[monitor] monitorReadSendSlowLog send slow log:%s", data);
}
}
static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
if (tmp == NULL){
return true;
}
SlowLogClient* pClient = (*(SlowLogClient**)tmp);
bool isEnd = false;
char* data = readFileByLine(pClient->pFile, &pClient->offset, &isEnd);
if(isEnd){
if (pClient == NULL){
return true;
}
int64_t size = getFileSize(pClient->path);
if(size <= pClient->offset){
taosUnLockFile(pClient->pFile);
taosCloseFile(&(pClient->pFile));
taosRemoveFile(pClient->path);
if((--quitCnt) == 0){
return true;
}
}else if(data != NULL){
}else{
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
if(pInst == NULL) {
return true;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep);
char* data = readFile(pClient->pFile, &pClient->offset, size);
if(data != NULL){
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep);
}
uDebug("[monitor] monitorReadSendSlowLog send slow log:%s", data);
}
return false;
@ -510,14 +532,21 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
static void monitorSendAllSlowLogAtQuit(){
void* pIter = NULL;
while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
SlowLogClient* pClient = (*(SlowLogClient**)pIter);
if(pClient == NULL) {
taosHashCancelIterate(monitorSlowLogHash, pIter);
return;
}
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId);
if(pInst == NULL) return;
SlowLogClient* pClient = (*(SlowLogClient**)pIter);
if(pInst == NULL) {
taosHashCancelIterate(monitorSlowLogHash, pIter);
return;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
bool isEnd = false;
int64_t offset = 0;
char* data = readFileByLine(pClient->pFile, &offset, &isEnd);
int64_t size = getFileSize(pClient->path);
char* data = readFile(pClient->pFile, &offset, size);
if(data != NULL && sendSlowLog(*clusterId, data, NULL, offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){
quitCnt ++;
}
@ -532,13 +561,20 @@ static void monitorSendAllSlowLog(){
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId);
SlowLogClient* pClient = (*(SlowLogClient**)pIter);
if (pClient == NULL){
taosHashCancelIterate(monitorSlowLogHash, pIter);
return;
}
if (pInst != NULL && t - pClient->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000 &&
pClient->offset == 0 && !isFileEmpty(pClient->path)) {
pClient->offset == 0) {
int64_t size = getFileSize(pClient->path);
if(size <= 0){
continue;
}
pClient->lastCheckTime = t;
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
bool isEnd = false;
int64_t offset = 0;
char* data = readFileByLine(pClient->pFile, &offset, &isEnd);
char* data = readFile(pClient->pFile, &offset, size);
if(data != NULL){
sendSlowLog(*clusterId, data, NULL, offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
}
@ -586,7 +622,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
char filename[PATH_MAX] = {0};
snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_STREAM | TD_FILE_TRUNC);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("failed to open file:%s since %s", filename, terrstr());
continue;

View File

@ -64,6 +64,65 @@ int main(int argc, char** argv) {
// clusterMonitorClose(cluster2);
//}
static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){
if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){
uError("failed to seek file:%p code: %d", pFile, errno);
return NULL;
}
ASSERT(size > *offset);
char* pCont = NULL;
int64_t totalSize = 0;
if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) {
pCont = (char*)taosMemoryCalloc(1, 2 * SLOW_LOG_SEND_SIZE_MAX);
totalSize = 2 * SLOW_LOG_SEND_SIZE_MAX;
}else{
pCont = (char*)taosMemoryCalloc(1, 2 * (size - *offset));
totalSize = 2 * (size - *offset);
}
if(pCont == NULL){
uError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
return NULL;
}
char* buf = pCont;
strcat(buf++, "[");
int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX);
if (readSize <= 0) {
if (readSize < 0){
uError("failed to read len from file:%p since %s", pFile, terrstr());
}
taosMemoryFree(pCont);
return NULL;
}
totalSize = 0;
while(1){
size_t len = strlen(buf);
totalSize += (len+1);
if (totalSize > readSize || len == 0) {
*(buf-1) = ']';
*buf = '\0';
break;
}
buf[len] = ','; // replace '\0' with ','
buf += (len + 1);
*offset += (len+1);
}
uDebug("[monitor] monitorReadSendSlowLog slow log:%s", pCont);
return pCont;
}
static int64_t getFileSize(char* path){
int64_t fileSize = 0;
if (taosStatFile(path, &fileSize, NULL, NULL) < 0) {
return -1;
}
return fileSize;
}
TEST(clientMonitorTest, sendTest) {
TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
ASSERT_TRUE(taos != NULL);
@ -91,8 +150,8 @@ TEST(clientMonitorTest, ReadOneFile) {
}
const int batch = 10;
const int size = SLOW_LOG_SEND_SIZE/batch;
for(int i = 0; i < batch + 1; i++){
const int size = 10;
for(int i = 0; i < batch; i++){
char value[size] = {0};
memset(value, '0' + i, size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
@ -106,64 +165,72 @@ TEST(clientMonitorTest, ReadOneFile) {
// Create an SEpSet object and set it up for testing
SEpSet* epSet = NULL;
int64_t fileSize = getFileSize("./tdengine-1-wewe");
// Call the function to be tested
// monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet);
char value[size] = {0};
memset(value, '0', size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr());
int64_t offset = 0;
while(1){
if (offset >= fileSize) {
break;
}
char* val = readFile(pFile, &offset, fileSize);
printf("offset:%" PRId64",fileSize:%"PRId64",val:%s\n", offset, fileSize, val);
}
// char value[size] = {0};
// memset(value, '0', size - 1);
// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
// uError("failed to write len to file:%p since %s", pFile, terrstr());
// }
// monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet);
// Clean up any resources created for testing
taosCloseFile(&pFile);
}
TEST(clientMonitorTest, ReadTwoFile) {
// Create a TdFilePtr object and set it up for testing
TdFilePtr pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("failed to open file:./test.txt since %s", terrstr());
return;
}
const int batch = 10;
const int size = SLOW_LOG_SEND_SIZE/batch;
for(int i = 0; i < batch + 1; i++){
char value[size] = {0};
memset(value, '0' + i, size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr());
}
}
taosFsyncFile(pFile);
taosCloseFile(&pFile);
pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-2-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("failed to open file:./test.txt since %s", terrstr());
return;
}
for(int i = 0; i < batch + 1; i++){
char value[size] = {0};
memset(value, '0' + i, size - 1);
if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
uError("failed to write len to file:%p since %s", pFile, terrstr());
}
}
taosFsyncFile(pFile);
taosCloseFile(&pFile);
SAppInstInfo pAppInfo = {0};
pAppInfo.clusterId = 2;
pAppInfo.monitorParas.tsEnableMonitor = 1;
strcpy(tsTempDir,"/tmp");
// monitorSendAllSlowLogFromTempDir(&pAppInfo);
}
//TEST(clientMonitorTest, ReadTwoFile) {
// // Create a TdFilePtr object and set it up for testing
//
// TdFilePtr pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-1-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
// if (pFile == NULL) {
// uError("failed to open file:./test.txt since %s", terrstr());
// return;
// }
//
// const int batch = 10;
// const int size = SLOW_LOG_SEND_SIZE/batch;
// for(int i = 0; i < batch + 1; i++){
// char value[size] = {0};
// memset(value, '0' + i, size - 1);
// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
// uError("failed to write len to file:%p since %s", pFile, terrstr());
// }
// }
//
// taosFsyncFile(pFile);
// taosCloseFile(&pFile);
//
// pFile = taosOpenFile("/tmp/tdengine_slow_log/tdengine-2-wewe", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
// if (pFile == NULL) {
// uError("failed to open file:./test.txt since %s", terrstr());
// return;
// }
//
// for(int i = 0; i < batch + 1; i++){
// char value[size] = {0};
// memset(value, '0' + i, size - 1);
// if (taosWriteFile(pFile, value, strlen(value) + 1) < 0){
// uError("failed to write len to file:%p since %s", pFile, terrstr());
// }
// }
//
// taosFsyncFile(pFile);
// taosCloseFile(&pFile);
//
// SAppInstInfo pAppInfo = {0};
// pAppInfo.clusterId = 2;
// pAppInfo.monitorParas.tsEnableMonitor = 1;
// strcpy(tsTempDir,"/tmp");
//// monitorSendAllSlowLogFromTempDir(&pAppInfo);
//
//}