fix:[TS-4921]refactor code
This commit is contained in:
parent
20c8e3168c
commit
6c5acdfc4b
|
@ -38,6 +38,13 @@ typedef enum {
|
|||
SLOW_LOG_READ_QUIT = 3,
|
||||
} SLOW_LOG_QUEUE_TYPE;
|
||||
|
||||
char* queueTypeStr[] = {
|
||||
"SLOW_LOG_WRITE",
|
||||
"SLOW_LOG_READ_RUNNING",
|
||||
"SLOW_LOG_READ_BEGINNIG",
|
||||
"SLOW_LOG_READ_QUIT"
|
||||
};
|
||||
|
||||
#define SLOW_LOG_SEND_SIZE_MAX 1024*1024
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -454,6 +454,10 @@ static int64_t getFileSize(char* path){
|
|||
}
|
||||
|
||||
static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){
|
||||
if (data == NULL){
|
||||
taosMemoryFree(fileName);
|
||||
return -1;
|
||||
}
|
||||
MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
|
||||
if(pParam == NULL){
|
||||
taosMemoryFree(data);
|
||||
|
@ -469,17 +473,26 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64
|
|||
return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
|
||||
}
|
||||
|
||||
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){
|
||||
static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size, SLOW_LOG_QUEUE_TYPE type, char* fileName){
|
||||
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
||||
if(pInst == NULL){
|
||||
tscError("failed to get app instance by clusterId:%" PRId64, clusterId);
|
||||
return -1;
|
||||
}
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
char* data = readFile(pFile, offset, size);
|
||||
return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName, pInst->pTransporter, &ep);
|
||||
}
|
||||
|
||||
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset){
|
||||
int64_t size = getFileSize(*fileName);
|
||||
if(size <= offset){
|
||||
processFileInTheEnd(pFile, *fileName);
|
||||
tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName);
|
||||
}else{
|
||||
char* data = readFile(pFile, &offset, size);
|
||||
if(data != NULL){
|
||||
sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, *fileName, pTransporter, epSet);
|
||||
*fileName = NULL;
|
||||
}
|
||||
int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
|
||||
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log clusterId:%"PRId64",ret:%d", clusterId, code);
|
||||
*fileName = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -500,16 +513,8 @@ static void monitorSendSlowLogAtRunning(int64_t clusterId){
|
|||
tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile);
|
||||
pClient->offset = 0;
|
||||
}else{
|
||||
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
||||
if(pInst == NULL){
|
||||
tscError("failed to get app instance by clusterId:%" PRId64, clusterId);
|
||||
return;
|
||||
}
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
char* data = readFile(pClient->pFile, &pClient->offset, size);
|
||||
if(data != NULL){
|
||||
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
|
||||
}
|
||||
int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
|
||||
tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log clusterId:%"PRId64",ret:%d", clusterId, code);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -531,15 +536,8 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
|
|||
return true;
|
||||
}
|
||||
}else{
|
||||
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
||||
if(pInst == NULL) {
|
||||
return true;
|
||||
}
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
char* data = readFile(pClient->pFile, &pClient->offset, size);
|
||||
if(data != NULL){
|
||||
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep);
|
||||
}
|
||||
int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
|
||||
tscDebug("[monitor] monitorSendSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", clusterId, code);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -556,13 +554,9 @@ static void monitorSendAllSlowLogAtQuit(){
|
|||
pClient->pFile = NULL;
|
||||
}else if(pClient->offset == 0){
|
||||
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
|
||||
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId);
|
||||
if(pInst == NULL) {
|
||||
continue;
|
||||
}
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
char* data = readFile(pClient->pFile, &pClient->offset, size);
|
||||
if(data != NULL && sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){
|
||||
int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
|
||||
tscDebug("[monitor] monitorSendAllSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", *clusterId, code);
|
||||
if (code == 0){
|
||||
quitCnt ++;
|
||||
}
|
||||
}
|
||||
|
@ -610,11 +604,8 @@ static void monitorSendAllSlowLog(){
|
|||
}
|
||||
continue;
|
||||
}
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
char* data = readFile(pClient->pFile, &pClient->offset, size);
|
||||
if(data != NULL){
|
||||
sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
|
||||
}
|
||||
int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
|
||||
tscDebug("[monitor] monitorSendAllSlowLog send slow log clusterId:%"PRId64",ret:%d", *clusterId, code);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -627,7 +618,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
|||
return;
|
||||
}
|
||||
char namePrefix[PATH_MAX] = {0};
|
||||
if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) {
|
||||
if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) {
|
||||
tscError("failed to generate slow log file name prefix");
|
||||
return;
|
||||
}
|
||||
|
@ -652,7 +643,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
|||
if (strcmp(name, ".") == 0 ||
|
||||
strcmp(name, "..") == 0 ||
|
||||
strstr(name, namePrefix) == NULL) {
|
||||
tscInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId);
|
||||
tscInfo("skip file:%s, for cluster id:%"PRIx64, name, clusterId);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -668,9 +659,8 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
|||
taosCloseFile(&pFile);
|
||||
continue;
|
||||
}
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
char *tmp = taosStrdup(filename);
|
||||
monitorSendSlowLogAtBeginning(pInst->clusterId, &tmp, pFile, 0, pInst->pTransporter, &ep);
|
||||
monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0);
|
||||
taosMemoryFree(tmp);
|
||||
}
|
||||
|
||||
|
@ -712,11 +702,7 @@ static void* monitorThreadFunc(void *param){
|
|||
if (slowLogData != NULL) {
|
||||
if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){
|
||||
if(slowLogData->pFile != NULL){
|
||||
SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId);
|
||||
if(pInst != NULL) {
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep);
|
||||
}
|
||||
monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset);
|
||||
}else{
|
||||
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
|
||||
}
|
||||
|
@ -850,7 +836,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){
|
|||
return -1;
|
||||
}
|
||||
*slowLogData = data;
|
||||
tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type);
|
||||
tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%s, data:%s", slowLogData->clusterId, queueTypeStr[slowLogData->type], slowLogData->data);
|
||||
if (taosWriteQitem(monitorQueue, slowLogData) == 0){
|
||||
tsem2_post(&monitorSem);
|
||||
}else{
|
||||
|
|
Loading…
Reference in New Issue