diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 4c7ab6f65a..0085173ecd 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -38,6 +38,13 @@ typedef enum { SLOW_LOG_READ_QUIT = 3, } SLOW_LOG_QUEUE_TYPE; +static char* queueTypeStr[] = { + "SLOW_LOG_WRITE", + "SLOW_LOG_READ_RUNNING", + "SLOW_LOG_READ_BEGINNIG", + "SLOW_LOG_READ_QUIT" +}; + #define SLOW_LOG_SEND_SIZE_MAX 1024*1024 typedef struct { @@ -65,7 +72,7 @@ typedef struct { } MonitorSlowLogData; void monitorClose(); -void monitorInit(); +int32_t monitorInit(); void monitorClientSQLReqInit(int64_t clusterKey); void monitorClientSlowQueryInit(int64_t clusterId); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 3a821768f8..ecfa1e3392 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -864,10 +864,15 @@ void taos_init_imp(void) { initQueryModuleMsgHandle(); if (taosConvInit() != 0) { + tscInitRes = -1; tscError("failed to init conv"); return; } - + if (monitorInit() != 0){ + tscInitRes = -1; + tscError("failed to init monitor"); + return; + } rpcInit(); SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; @@ -891,7 +896,6 @@ void taos_init_imp(void) { taosThreadMutexInit(&appInfo.mutex, NULL); tscCrashReportInit(); - monitorInit(); tscDebug("client is initialized successfully"); } diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 479ea76fe3..d1a9897caa 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -18,6 +18,7 @@ int32_t quitCnt = 0; tsem2_t monitorSem; STaosQueue* monitorQueue; SHashObj* monitorSlowLogHash; +char tmpSlowLogPath[PATH_MAX] = {0}; static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){ if (tsTempDir == NULL) { @@ -453,6 +454,10 @@ static int64_t getFileSize(char* path){ } static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){ + if (data == NULL){ + taosMemoryFree(fileName); + return -1; + } MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); if(pParam == NULL){ taosMemoryFree(data); @@ -468,18 +473,26 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64 return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam); } -static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){ +static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size, SLOW_LOG_QUEUE_TYPE type, char* fileName){ + SAppInstInfo* pInst = getAppInstByClusterId(clusterId); + if(pInst == NULL){ + tscError("failed to get app instance by clusterId:%" PRId64, clusterId); + return -1; + } + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + char* data = readFile(pFile, offset, size); + return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName, pInst->pTransporter, &ep); +} + +static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset){ int64_t size = getFileSize(*fileName); if(size <= offset){ processFileInTheEnd(pFile, *fileName); tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); }else{ - char* data = readFile(pFile, &offset, size); - if(data != NULL){ - sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, *fileName, pTransporter, epSet); - *fileName = NULL; - } - tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p, data:%s", pFile, data); + int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName); + tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log clusterId:%"PRId64",ret:%d", clusterId, code); + *fileName = NULL; } } @@ -500,17 +513,8 @@ static void monitorSendSlowLogAtRunning(int64_t clusterId){ tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile); pClient->offset = 0; }else{ - SAppInstInfo* pInst = getAppInstByClusterId(clusterId); - if(pInst == NULL){ - tscError("failed to get app instance by clusterId:%" PRId64, clusterId); - return; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL){ - sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); - } - tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log:%s", data); + int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL); + tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log clusterId:%"PRId64",ret:%d", clusterId, code); } } @@ -532,16 +536,8 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) { return true; } }else{ - SAppInstInfo* pInst = getAppInstByClusterId(clusterId); - if(pInst == NULL) { - return true; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL){ - sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep); - } - tscInfo("[monitor] monitorSendSlowLogAtQuit send slow log:%s", data); + int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL); + tscDebug("[monitor] monitorSendSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", clusterId, code); } return false; } @@ -558,16 +554,11 @@ static void monitorSendAllSlowLogAtQuit(){ pClient->pFile = NULL; }else if(pClient->offset == 0){ int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); - SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); - if(pInst == NULL) { - continue; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL && sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){ + int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL); + tscDebug("[monitor] monitorSendAllSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", *clusterId, code); + if (code == 0){ quitCnt ++; } - tscInfo("[monitor] monitorSendAllSlowLogAtQuit send slow log :%s", data); } } } @@ -613,12 +604,8 @@ static void monitorSendAllSlowLog(){ } continue; } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pClient->pFile, &pClient->offset, size); - if(data != NULL){ - sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep); - } - tscDebug("[monitor] monitorSendAllSlowLog send slow log :%s", data); + int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL); + tscDebug("[monitor] monitorSendAllSlowLog send slow log clusterId:%"PRId64",ret:%d", *clusterId, code); } } } @@ -631,7 +618,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ return; } char namePrefix[PATH_MAX] = {0}; - if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) { + if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) { tscError("failed to generate slow log file name prefix"); return; } @@ -656,7 +643,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || strstr(name, namePrefix) == NULL) { - tscInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId); + tscInfo("skip file:%s, for cluster id:%"PRIx64, name, clusterId); continue; } @@ -672,9 +659,8 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ taosCloseFile(&pFile); continue; } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); char *tmp = taosStrdup(filename); - monitorSendSlowLogAtBeginning(pInst->clusterId, &tmp, pFile, 0, pInst->pTransporter, &ep); + monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0); taosMemoryFree(tmp); } @@ -690,28 +676,6 @@ static void* monitorThreadFunc(void *param){ } #endif - char tmpPath[PATH_MAX] = {0}; - if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0){ - return NULL; - } - - if (taosMulModeMkDir(tmpPath, 0777, true) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - printf("failed to create dir:%s since %s", tmpPath, terrstr()); - return NULL; - } - - if (tsem2_init(&monitorSem, 0, 0) != 0) { - tscError("sem init error since %s", terrstr()); - return NULL; - } - - monitorQueue = taosOpenQueue(); - if(monitorQueue == NULL){ - tscError("open queue error since %s", terrstr()); - return NULL; - } - if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) { return NULL; } @@ -738,16 +702,12 @@ static void* monitorThreadFunc(void *param){ if (slowLogData != NULL) { if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){ if(slowLogData->pFile != NULL){ - SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); - if(pInst != NULL) { - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep); - } + monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset); }else{ monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); } } else if(slowLogData->type == SLOW_LOG_WRITE){ - monitorWriteSlowLog2File(slowLogData, tmpPath); + monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath); } else if(slowLogData->type == SLOW_LOG_READ_RUNNING){ monitorSendSlowLogAtRunning(slowLogData->clusterId); } else if(slowLogData->type == SLOW_LOG_READ_QUIT){ @@ -799,27 +759,59 @@ static void tscMonitorStop() { } } -void monitorInit() { +int32_t monitorInit() { tscInfo("[monitor] tscMonitor init"); monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (monitorCounterHash == NULL) { tscError("failed to create monitorCounterHash"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (monitorSlowLogHash == NULL) { tscError("failed to create monitorSlowLogHash"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient); monitorTimer = taosTmrInit(0, 0, 0, "MONITOR"); if (monitorTimer == NULL) { tscError("failed to create monitor timer"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath)) < 0){ + terrno = TSDB_CODE_TSC_INTERNAL_ERROR; + return -1; + } + + if (taosMulModeMkDir(tmpSlowLogPath, 0777, true) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr()); + return -1; + } + + if (tsem2_init(&monitorSem, 0, 0) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + tscError("sem init error since %s", terrstr()); + return -1; + } + + monitorQueue = taosOpenQueue(); + if(monitorQueue == NULL){ + tscError("open queue error since %s", terrstr()); + return -1; } taosInitRWLatch(&monitorLock); - tscMonitortInit(); + if (tscMonitortInit() != 0){ + return -1; + } + return 0; } void monitorClose() { @@ -844,10 +836,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){ return -1; } *slowLogData = data; - tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type); - while (atomic_load_32(&slowLogFlag) == -1) { - taosMsleep(5); - } + tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%s, data:%s", slowLogData->clusterId, queueTypeStr[slowLogData->type], slowLogData->data); if (taosWriteQitem(monitorQueue, slowLogData) == 0){ tsem2_post(&monitorSem); }else{ diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index e5baa7137e..d587deffc5 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -154,13 +154,14 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { if(taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL){ if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){ tscError("failed to put appInfo into appInfo.pInstMapByClusterId"); + }else{ + MonitorSlowLogData data = {0}; + data.clusterId = pTscObj->pAppInfo->clusterId; + data.type = SLOW_LOG_READ_BEGINNIG; + monitorPutData2MonitorQueue(data); + monitorClientSlowQueryInit(connectRsp.clusterId); + monitorClientSQLReqInit(connectRsp.clusterId); } - MonitorSlowLogData data = {0}; - data.clusterId = pTscObj->pAppInfo->clusterId; - data.type = SLOW_LOG_READ_BEGINNIG; - monitorPutData2MonitorQueue(data); - monitorClientSlowQueryInit(connectRsp.clusterId); - monitorClientSQLReqInit(connectRsp.clusterId); } taosThreadMutexLock(&clientHbMgr.lock);