diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 68b55e71a9..a8cdbe4ad1 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -43,7 +43,8 @@ typedef struct { typedef struct { TdFilePtr pFile; - void* timer; + int64_t lastCheckTime; + char path[PATH_MAX]; } SlowLogClient; typedef struct { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index af396876bb..1248f87f19 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -202,24 +202,24 @@ static void deregisterRequest(SRequestObj *pRequest) { pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) { - if (pRequest->pQuery && pRequest->pQuery->pRoot) { - if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type && - (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) { - tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 - "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", - duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, - pRequest->metric.planCostUs, pRequest->metric.execCostUs); - atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); - reqType = SLOW_LOG_TYPE_INSERT; - } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { - tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 - "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", - duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, - pRequest->metric.planCostUs, pRequest->metric.execCostUs); + if ((pRequest->pQuery && pRequest->pQuery->pRoot && + QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type && + (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) || + QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) { + tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 + "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", + duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, + pRequest->metric.planCostUs, pRequest->metric.execCostUs); + atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); + reqType = SLOW_LOG_TYPE_INSERT; + } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { + tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 + "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", + duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, + pRequest->metric.planCostUs, pRequest->metric.execCostUs); - atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); - reqType = SLOW_LOG_TYPE_QUERY; - } + atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); + reqType = SLOW_LOG_TYPE_QUERY; } nodesSimReleaseAllocator(pRequest->allocatorRefId); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 0a480e1cbd..07a9e0c463 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -549,12 +549,6 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo; int32_t oldInterval = pInst->monitorParas.tsMonitorInterval; pInst->monitorParas = pRsp.monitorParas; - if(oldInterval > pInst->monitorParas.tsMonitorInterval){ - char* value = taosStrdup(""); - if(monitorPutData2MonitorQueue(pInst->clusterId, value) < 0){ - taosMemoryFree(value); - } - } tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 260a0ebbaa..5911d5d04c 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -45,19 +45,6 @@ static void destroySlowLogClient(void* data){ return; } SlowLogClient* slowLogClient = *(SlowLogClient**)data; - if(slowLogClient == NULL){ - return; - } - taosTmrStopA(&(*(SlowLogClient**)data)->timer); - - TdFilePtr pFile = slowLogClient->pFile; - if(pFile == NULL){ - taosMemoryFree(slowLogClient); - return; - } - - taosUnLockFile(pFile); - taosCloseFile(&pFile); taosMemoryFree(slowLogClient); } @@ -85,6 +72,11 @@ static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) { return *(SAppInstInfo**)p; } +typedef struct { + tsem_t sem; + int32_t code; +} SlowLogParam; + static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { if (TSDB_CODE_SUCCESS != code) { uError("found error in monitorReport send callback, code:%d, please check the network.", code); @@ -93,10 +85,15 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } + if(param != NULL){ + SlowLogParam* p = (SlowLogParam*)param; + p->code = code; + tsem_post(&p->sem); + } return code; } -static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITOR_TYPE type) { +static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITOR_TYPE type, void* param) { SStatisReq sStatisReq; sStatisReq.pCont = pCont; sStatisReq.contLen = strlen(pCont); @@ -122,9 +119,8 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO pInfo->msgInfo.pData = buf; pInfo->msgInfo.len = tlen; pInfo->msgType = TDMT_MND_STATIS; - // pInfo->param = taosMemoryMalloc(sizeof(int32_t)); - // *(int32_t*)pInfo->param = i; - pInfo->paramFreeFp = taosMemoryFree; + pInfo->param = param; +// pInfo->paramFreeFp = taosMemoryFree; pInfo->requestId = tGenIdPI64(); pInfo->requestObjRefId = 0; @@ -136,20 +132,22 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO return code; } -static void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){ +static bool monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){ char buf[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log char pCont[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log int32_t offset = 0; if(taosLSeekFile(pFile, 0, SEEK_SET) < 0){ uError("failed to seek file:%p code: %d", pFile, errno); - return; + return false; } while(1){ int64_t readSize = taosReadFile(pFile, buf + offset, SLOW_LOG_SEND_SIZE - offset); if (readSize <= 0) { - if (readSize < 0) + if (readSize < 0){ uError("failed to read len from file:%p since %s", pFile, terrstr()); - return; + return false; + } + break; } memset(pCont, 0, sizeof(pCont)); @@ -165,13 +163,30 @@ static void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet * } strcat(pCont, "]"); if (pTransporter && pCont != NULL) { - if(sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_SLOW_LOG) != 0){ + SlowLogParam* pParam = taosMemoryMalloc(sizeof(SlowLogParam)); + if (pParam == NULL) { + return false; + } + if (tsem_init(&pParam->sem, 0, 0) != 0){ + taosMemoryFree(pParam); + return false; + } + pParam->code = sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_SLOW_LOG, pParam); + if(pParam->code == TSDB_CODE_SUCCESS){ + tsem_wait(&pParam->sem); + } + tsem_destroy(&pParam->sem); + + if(pParam->code != TSDB_CODE_SUCCESS){ if(taosLSeekFile(pFile, -readSize, SEEK_CUR) < 0){ uError("failed to seek file:%p code: %d", pFile, errno); } uError("failed to send report:%s", pCont); - return; + taosMemoryFree(pParam); + return false; } + taosMemoryFree(pParam); + uDebug("[monitor] monitorReadSendSlowLog send slow log to mnode:%s", pCont) } @@ -188,6 +203,7 @@ static void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet * uError("failed to truncate file:%p code: %d", pFile, errno); } uDebug("[monitor] monitorReadSendSlowLog send slow log file:%p", pFile); + return true; } static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet *epSet) { @@ -199,7 +215,7 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr return; } - if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER) == 0) { + if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) { taos_collector_registry_clear_batch(registry); } taosMemoryFreeClear(pCont); @@ -225,25 +241,6 @@ static void reportSendProcess(void* param, void* tmrId) { taosRUnLockLatch(&monitorLock); } -static void sendAllSlowLog(){ - void* data = taosHashIterate(monitorSlowLogHash, NULL); - while (data != NULL) { - TdFilePtr pFile = (*(SlowLogClient**)data)->pFile; - if (pFile != NULL){ - int64_t clusterId = *(int64_t*)taosHashGetKey(data, NULL); - SAppInstInfo* pInst = getAppInstByClusterId(clusterId); - if(pInst == NULL){ - taosHashCancelIterate(monitorSlowLogHash, data); - break; - } - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep); - } - data = taosHashIterate(monitorSlowLogHash, data); - } - uDebug("[monitor] sendAllSlowLog when client close"); -} - static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId); @@ -257,16 +254,14 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ return; } - taosRLockLatch(&monitorLock); - char tmpPath[PATH_MAX] = {0}; if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) { - goto END; + return; } TdDirPtr pDir = taosOpenDir(tmpPath); if (pDir == NULL) { - goto END; + return; } TdDirEntryPtr de = NULL; @@ -296,18 +291,18 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ continue; } SEpSet ep = getEpSet_s(&pInst->mgmtEp); - monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep); + bool truncated = monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep); taosUnLockFile(pFile); taosCloseFile(&pFile); - taosRemoveFile(filename); + + if(truncated){ + taosRemoveFile(filename); + } uDebug("[monitor] send and delete slow log file when reveive connect rsp:%s", filename); } taosCloseDir(&pDir); - -END: - taosRUnLockLatch(&monitorLock); } static void sendAllCounter(){ @@ -449,37 +444,7 @@ static void monitorFreeSlowLogData(MonitorSlowLogData* pData) { static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); } -static void reportSlowLog(void* param, void* tmrId) { - taosWLockLatch(&monitorLock); - if (atomic_load_32(&monitorFlag) == 1) { - taosRUnLockLatch(&monitorLock); - return; - } - SAppInstInfo* pInst = getAppInstByClusterId((int64_t)param); - if(pInst == NULL){ - uError("failed to get app inst, clusterId:%"PRIx64, (int64_t)param); - taosRUnLockLatch(&monitorLock); - return; - } - - void* tmp = taosHashGet(monitorSlowLogHash, ¶m, LONG_BYTES); - if(tmp == NULL){ - uError("failed to get file inst, clusterId:%"PRIx64, (int64_t)param); - taosRUnLockLatch(&monitorLock); - return; - } - - SEpSet ep = getEpSet_s(&pInst->mgmtEp); - monitorReadSendSlowLog((*(SlowLogClient**)tmp)->pFile, pInst->pTransporter, &ep); - - if((*(SlowLogClient**)tmp)->timer == tmrId){ - taosTmrReset(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &(*(SlowLogClient**)tmp)->timer); - } - taosWUnLockLatch(&monitorLock); -} - static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){ - taosWLockLatch(&monitorLock); TdFilePtr pFile = NULL; void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); if (tmp == NULL){ @@ -487,7 +452,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP char clusterId[32] = {0}; if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0){ uError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId); - goto FAILED; + return; } taosGetTmpfilePath(tmpPath, clusterId, path); uInfo("[monitor] create slow log file:%s", path); @@ -495,35 +460,35 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to open file:%s since %s", path, terrstr()); - goto FAILED; + return; } SlowLogClient *pClient = taosMemoryCalloc(1, sizeof(SlowLogClient)); if (pClient == NULL){ uError("failed to allocate memory for slow log client"); taosCloseFile(&pFile); - goto FAILED; + return; } + pClient->lastCheckTime = taosGetMonoTimestampMs(); + strcpy(pClient->path, path); pClient->pFile = pFile; if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0){ uError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); taosCloseFile(&pFile); taosMemoryFree(pClient); - goto FAILED; + return; } if(taosLockFile(pFile) < 0){ uError("failed to lock file:%p since %s", pFile, terrstr()); - goto FAILED; + return; } SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId); if(pInst == NULL){ uError("failed to get app instance by clusterId:%" PRId64, slowLogData->clusterId); - goto FAILED; + return; } - - pClient->timer = taosTmrStart(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, (void*)slowLogData->clusterId, monitorTimer); }else{ pFile = (*(SlowLogClient**)tmp)->pFile; } @@ -532,26 +497,30 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP uError("failed to write len to file:%p since %s", pFile, terrstr()); } uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId); - -FAILED: - taosWUnLockLatch(&monitorLock); } -static void restartReportTimer(int64_t clusterId){ - taosWLockLatch(&monitorLock); +static void monitorSendAllSlowLog(bool quit){ + int64_t t = taosGetMonoTimestampMs(); - void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); - if(tmp){ - taosTmrStopA(&(*(SlowLogClient**)tmp)->timer); - SAppInstInfo* pInst = getAppInstByClusterId(clusterId); - if(pInst == NULL){ - uError("failed to get app inst, clusterId:%"PRIx64, clusterId); - return; + void* pIter = NULL; + while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) { + int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); + SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); + bool truncated = false; + if(quit || (pInst != NULL && t - (*(SlowLogClient**)pIter)->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000)) { + (*(SlowLogClient**)pIter)->lastCheckTime = t; + SEpSet ep = getEpSet_s(&pInst->mgmtEp); + truncated = monitorReadSendSlowLog((*(SlowLogClient**)pIter)->pFile, pInst->pTransporter, &ep); } - (*(SlowLogClient**)tmp)->timer = taosTmrStart(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, (void*)clusterId, monitorTimer); + if(quit){ + taosUnLockFile((*(SlowLogClient**)pIter)->pFile); + taosCloseFile(&((*(SlowLogClient**)pIter)->pFile)); + if(truncated){ + taosRemoveFile((*(SlowLogClient**)pIter)->path); + } + } } - taosWUnLockLatch(&monitorLock); } static void* monitorThreadFunc(void *param){ @@ -590,7 +559,10 @@ static void* monitorThreadFunc(void *param){ } uDebug("monitorThreadFunc start"); while (1) { - if (slowLogFlag > 0) break; + if (slowLogFlag > 0) { + monitorSendAllSlowLog(true); + break; + } MonitorSlowLogData* slowLogData = NULL; taosReadQitem(monitorQueue, (void**)&slowLogData); @@ -598,15 +570,15 @@ static void* monitorThreadFunc(void *param){ uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); if (slowLogData->value == NULL){ monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); - } else if(strlen(slowLogData->value) == 0){ - restartReportTimer(slowLogData->clusterId); } else{ monitorWriteSlowLog2File(slowLogData, tmpPath); } } monitorFreeSlowLogData(slowLogData); taosFreeQitem(slowLogData); - tsem2_timewait(&monitorSem, 500); + + monitorSendAllSlowLog(false); + tsem2_timewait(&monitorSem, 100); } taosCloseQueue(monitorQueue); @@ -671,7 +643,6 @@ void monitorClose() { uDebug("[monitor] monitorFlag is not 0"); } tscMonitorStop(); - sendAllSlowLog(); sendAllCounter(); taosHashCleanup(monitorCounterHash); taosHashCleanup(monitorSlowLogHash);