fix:remove using alarm when sending slow log & use one thread to send all slow log
This commit is contained in:
parent
b5a29541a3
commit
328dfa3146
|
@ -43,7 +43,8 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TdFilePtr pFile;
|
TdFilePtr pFile;
|
||||||
void* timer;
|
int64_t lastCheckTime;
|
||||||
|
char path[PATH_MAX];
|
||||||
} SlowLogClient;
|
} SlowLogClient;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -202,24 +202,24 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
||||||
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
|
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
|
if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
|
||||||
if (pRequest->pQuery && pRequest->pQuery->pRoot) {
|
if ((pRequest->pQuery && pRequest->pQuery->pRoot &&
|
||||||
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
|
QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
|
||||||
(0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) {
|
(0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) ||
|
||||||
tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
|
QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType) {
|
||||||
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
|
tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
|
||||||
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
|
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
|
||||||
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
|
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
|
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
|
||||||
reqType = SLOW_LOG_TYPE_INSERT;
|
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
|
||||||
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
|
reqType = SLOW_LOG_TYPE_INSERT;
|
||||||
tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
|
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
|
||||||
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
|
tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
|
||||||
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
|
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
|
||||||
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
|
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
|
||||||
|
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
|
||||||
|
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
|
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
|
||||||
reqType = SLOW_LOG_TYPE_QUERY;
|
reqType = SLOW_LOG_TYPE_QUERY;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
nodesSimReleaseAllocator(pRequest->allocatorRefId);
|
nodesSimReleaseAllocator(pRequest->allocatorRefId);
|
||||||
|
|
|
@ -549,12 +549,6 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
|
SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
|
||||||
int32_t oldInterval = pInst->monitorParas.tsMonitorInterval;
|
int32_t oldInterval = pInst->monitorParas.tsMonitorInterval;
|
||||||
pInst->monitorParas = pRsp.monitorParas;
|
pInst->monitorParas = pRsp.monitorParas;
|
||||||
if(oldInterval > pInst->monitorParas.tsMonitorInterval){
|
|
||||||
char* value = taosStrdup("");
|
|
||||||
if(monitorPutData2MonitorQueue(pInst->clusterId, value) < 0){
|
|
||||||
taosMemoryFree(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
|
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
|
||||||
pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
|
pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
|
||||||
|
|
||||||
|
|
|
@ -45,19 +45,6 @@ static void destroySlowLogClient(void* data){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SlowLogClient* slowLogClient = *(SlowLogClient**)data;
|
SlowLogClient* slowLogClient = *(SlowLogClient**)data;
|
||||||
if(slowLogClient == NULL){
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
taosTmrStopA(&(*(SlowLogClient**)data)->timer);
|
|
||||||
|
|
||||||
TdFilePtr pFile = slowLogClient->pFile;
|
|
||||||
if(pFile == NULL){
|
|
||||||
taosMemoryFree(slowLogClient);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosUnLockFile(pFile);
|
|
||||||
taosCloseFile(&pFile);
|
|
||||||
taosMemoryFree(slowLogClient);
|
taosMemoryFree(slowLogClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,6 +72,11 @@ static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
|
||||||
return *(SAppInstInfo**)p;
|
return *(SAppInstInfo**)p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
tsem_t sem;
|
||||||
|
int32_t code;
|
||||||
|
} SlowLogParam;
|
||||||
|
|
||||||
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
|
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
uError("found error in monitorReport send callback, code:%d, please check the network.", code);
|
uError("found error in monitorReport send callback, code:%d, please check the network.", code);
|
||||||
|
@ -93,10 +85,15 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
taosMemoryFree(pMsg->pEpSet);
|
taosMemoryFree(pMsg->pEpSet);
|
||||||
}
|
}
|
||||||
|
if(param != NULL){
|
||||||
|
SlowLogParam* p = (SlowLogParam*)param;
|
||||||
|
p->code = code;
|
||||||
|
tsem_post(&p->sem);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITOR_TYPE type) {
|
static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITOR_TYPE type, void* param) {
|
||||||
SStatisReq sStatisReq;
|
SStatisReq sStatisReq;
|
||||||
sStatisReq.pCont = pCont;
|
sStatisReq.pCont = pCont;
|
||||||
sStatisReq.contLen = strlen(pCont);
|
sStatisReq.contLen = strlen(pCont);
|
||||||
|
@ -122,9 +119,8 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
|
||||||
pInfo->msgInfo.pData = buf;
|
pInfo->msgInfo.pData = buf;
|
||||||
pInfo->msgInfo.len = tlen;
|
pInfo->msgInfo.len = tlen;
|
||||||
pInfo->msgType = TDMT_MND_STATIS;
|
pInfo->msgType = TDMT_MND_STATIS;
|
||||||
// pInfo->param = taosMemoryMalloc(sizeof(int32_t));
|
pInfo->param = param;
|
||||||
// *(int32_t*)pInfo->param = i;
|
// pInfo->paramFreeFp = taosMemoryFree;
|
||||||
pInfo->paramFreeFp = taosMemoryFree;
|
|
||||||
pInfo->requestId = tGenIdPI64();
|
pInfo->requestId = tGenIdPI64();
|
||||||
pInfo->requestObjRefId = 0;
|
pInfo->requestObjRefId = 0;
|
||||||
|
|
||||||
|
@ -136,20 +132,22 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){
|
static bool monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){
|
||||||
char buf[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log
|
char buf[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log
|
||||||
char pCont[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log
|
char pCont[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
if(taosLSeekFile(pFile, 0, SEEK_SET) < 0){
|
if(taosLSeekFile(pFile, 0, SEEK_SET) < 0){
|
||||||
uError("failed to seek file:%p code: %d", pFile, errno);
|
uError("failed to seek file:%p code: %d", pFile, errno);
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
while(1){
|
while(1){
|
||||||
int64_t readSize = taosReadFile(pFile, buf + offset, SLOW_LOG_SEND_SIZE - offset);
|
int64_t readSize = taosReadFile(pFile, buf + offset, SLOW_LOG_SEND_SIZE - offset);
|
||||||
if (readSize <= 0) {
|
if (readSize <= 0) {
|
||||||
if (readSize < 0)
|
if (readSize < 0){
|
||||||
uError("failed to read len from file:%p since %s", pFile, terrstr());
|
uError("failed to read len from file:%p since %s", pFile, terrstr());
|
||||||
return;
|
return false;
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(pCont, 0, sizeof(pCont));
|
memset(pCont, 0, sizeof(pCont));
|
||||||
|
@ -165,13 +163,30 @@ static void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *
|
||||||
}
|
}
|
||||||
strcat(pCont, "]");
|
strcat(pCont, "]");
|
||||||
if (pTransporter && pCont != NULL) {
|
if (pTransporter && pCont != NULL) {
|
||||||
if(sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_SLOW_LOG) != 0){
|
SlowLogParam* pParam = taosMemoryMalloc(sizeof(SlowLogParam));
|
||||||
|
if (pParam == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (tsem_init(&pParam->sem, 0, 0) != 0){
|
||||||
|
taosMemoryFree(pParam);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
pParam->code = sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_SLOW_LOG, pParam);
|
||||||
|
if(pParam->code == TSDB_CODE_SUCCESS){
|
||||||
|
tsem_wait(&pParam->sem);
|
||||||
|
}
|
||||||
|
tsem_destroy(&pParam->sem);
|
||||||
|
|
||||||
|
if(pParam->code != TSDB_CODE_SUCCESS){
|
||||||
if(taosLSeekFile(pFile, -readSize, SEEK_CUR) < 0){
|
if(taosLSeekFile(pFile, -readSize, SEEK_CUR) < 0){
|
||||||
uError("failed to seek file:%p code: %d", pFile, errno);
|
uError("failed to seek file:%p code: %d", pFile, errno);
|
||||||
}
|
}
|
||||||
uError("failed to send report:%s", pCont);
|
uError("failed to send report:%s", pCont);
|
||||||
return;
|
taosMemoryFree(pParam);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(pParam);
|
||||||
|
|
||||||
uDebug("[monitor] monitorReadSendSlowLog send slow log to mnode:%s", pCont)
|
uDebug("[monitor] monitorReadSendSlowLog send slow log to mnode:%s", pCont)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,6 +203,7 @@ static void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *
|
||||||
uError("failed to truncate file:%p code: %d", pFile, errno);
|
uError("failed to truncate file:%p code: %d", pFile, errno);
|
||||||
}
|
}
|
||||||
uDebug("[monitor] monitorReadSendSlowLog send slow log file:%p", pFile);
|
uDebug("[monitor] monitorReadSendSlowLog send slow log file:%p", pFile);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet *epSet) {
|
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet *epSet) {
|
||||||
|
@ -199,7 +215,7 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER) == 0) {
|
if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
|
||||||
taos_collector_registry_clear_batch(registry);
|
taos_collector_registry_clear_batch(registry);
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(pCont);
|
taosMemoryFreeClear(pCont);
|
||||||
|
@ -225,25 +241,6 @@ static void reportSendProcess(void* param, void* tmrId) {
|
||||||
taosRUnLockLatch(&monitorLock);
|
taosRUnLockLatch(&monitorLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void sendAllSlowLog(){
|
|
||||||
void* data = taosHashIterate(monitorSlowLogHash, NULL);
|
|
||||||
while (data != NULL) {
|
|
||||||
TdFilePtr pFile = (*(SlowLogClient**)data)->pFile;
|
|
||||||
if (pFile != NULL){
|
|
||||||
int64_t clusterId = *(int64_t*)taosHashGetKey(data, NULL);
|
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
|
||||||
if(pInst == NULL){
|
|
||||||
taosHashCancelIterate(monitorSlowLogHash, data);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
|
||||||
monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep);
|
|
||||||
}
|
|
||||||
data = taosHashIterate(monitorSlowLogHash, data);
|
|
||||||
}
|
|
||||||
uDebug("[monitor] sendAllSlowLog when client close");
|
|
||||||
}
|
|
||||||
|
|
||||||
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
|
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
|
||||||
|
|
||||||
|
@ -257,16 +254,14 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRLockLatch(&monitorLock);
|
|
||||||
|
|
||||||
char tmpPath[PATH_MAX] = {0};
|
char tmpPath[PATH_MAX] = {0};
|
||||||
if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
|
if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
|
||||||
goto END;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
TdDirPtr pDir = taosOpenDir(tmpPath);
|
TdDirPtr pDir = taosOpenDir(tmpPath);
|
||||||
if (pDir == NULL) {
|
if (pDir == NULL) {
|
||||||
goto END;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
TdDirEntryPtr de = NULL;
|
TdDirEntryPtr de = NULL;
|
||||||
|
@ -296,18 +291,18 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||||
monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep);
|
bool truncated = monitorReadSendSlowLog(pFile, pInst->pTransporter, &ep);
|
||||||
taosUnLockFile(pFile);
|
taosUnLockFile(pFile);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
taosRemoveFile(filename);
|
|
||||||
|
if(truncated){
|
||||||
|
taosRemoveFile(filename);
|
||||||
|
}
|
||||||
uDebug("[monitor] send and delete slow log file when reveive connect rsp:%s", filename);
|
uDebug("[monitor] send and delete slow log file when reveive connect rsp:%s", filename);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseDir(&pDir);
|
taosCloseDir(&pDir);
|
||||||
|
|
||||||
END:
|
|
||||||
taosRUnLockLatch(&monitorLock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void sendAllCounter(){
|
static void sendAllCounter(){
|
||||||
|
@ -449,37 +444,7 @@ static void monitorFreeSlowLogData(MonitorSlowLogData* pData) {
|
||||||
|
|
||||||
static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); }
|
static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); }
|
||||||
|
|
||||||
static void reportSlowLog(void* param, void* tmrId) {
|
|
||||||
taosWLockLatch(&monitorLock);
|
|
||||||
if (atomic_load_32(&monitorFlag) == 1) {
|
|
||||||
taosRUnLockLatch(&monitorLock);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)param);
|
|
||||||
if(pInst == NULL){
|
|
||||||
uError("failed to get app inst, clusterId:%"PRIx64, (int64_t)param);
|
|
||||||
taosRUnLockLatch(&monitorLock);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* tmp = taosHashGet(monitorSlowLogHash, ¶m, LONG_BYTES);
|
|
||||||
if(tmp == NULL){
|
|
||||||
uError("failed to get file inst, clusterId:%"PRIx64, (int64_t)param);
|
|
||||||
taosRUnLockLatch(&monitorLock);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
|
||||||
monitorReadSendSlowLog((*(SlowLogClient**)tmp)->pFile, pInst->pTransporter, &ep);
|
|
||||||
|
|
||||||
if((*(SlowLogClient**)tmp)->timer == tmrId){
|
|
||||||
taosTmrReset(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &(*(SlowLogClient**)tmp)->timer);
|
|
||||||
}
|
|
||||||
taosWUnLockLatch(&monitorLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){
|
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){
|
||||||
taosWLockLatch(&monitorLock);
|
|
||||||
TdFilePtr pFile = NULL;
|
TdFilePtr pFile = NULL;
|
||||||
void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
|
void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
|
||||||
if (tmp == NULL){
|
if (tmp == NULL){
|
||||||
|
@ -487,7 +452,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP
|
||||||
char clusterId[32] = {0};
|
char clusterId[32] = {0};
|
||||||
if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0){
|
if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0){
|
||||||
uError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId);
|
uError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId);
|
||||||
goto FAILED;
|
return;
|
||||||
}
|
}
|
||||||
taosGetTmpfilePath(tmpPath, clusterId, path);
|
taosGetTmpfilePath(tmpPath, clusterId, path);
|
||||||
uInfo("[monitor] create slow log file:%s", path);
|
uInfo("[monitor] create slow log file:%s", path);
|
||||||
|
@ -495,35 +460,35 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
uError("failed to open file:%s since %s", path, terrstr());
|
uError("failed to open file:%s since %s", path, terrstr());
|
||||||
goto FAILED;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SlowLogClient *pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
|
SlowLogClient *pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
|
||||||
if (pClient == NULL){
|
if (pClient == NULL){
|
||||||
uError("failed to allocate memory for slow log client");
|
uError("failed to allocate memory for slow log client");
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
goto FAILED;
|
return;
|
||||||
}
|
}
|
||||||
|
pClient->lastCheckTime = taosGetMonoTimestampMs();
|
||||||
|
strcpy(pClient->path, path);
|
||||||
pClient->pFile = pFile;
|
pClient->pFile = pFile;
|
||||||
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0){
|
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0){
|
||||||
uError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
|
uError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
taosMemoryFree(pClient);
|
taosMemoryFree(pClient);
|
||||||
goto FAILED;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(taosLockFile(pFile) < 0){
|
if(taosLockFile(pFile) < 0){
|
||||||
uError("failed to lock file:%p since %s", pFile, terrstr());
|
uError("failed to lock file:%p since %s", pFile, terrstr());
|
||||||
goto FAILED;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId);
|
SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId);
|
||||||
if(pInst == NULL){
|
if(pInst == NULL){
|
||||||
uError("failed to get app instance by clusterId:%" PRId64, slowLogData->clusterId);
|
uError("failed to get app instance by clusterId:%" PRId64, slowLogData->clusterId);
|
||||||
goto FAILED;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pClient->timer = taosTmrStart(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, (void*)slowLogData->clusterId, monitorTimer);
|
|
||||||
}else{
|
}else{
|
||||||
pFile = (*(SlowLogClient**)tmp)->pFile;
|
pFile = (*(SlowLogClient**)tmp)->pFile;
|
||||||
}
|
}
|
||||||
|
@ -532,26 +497,30 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP
|
||||||
uError("failed to write len to file:%p since %s", pFile, terrstr());
|
uError("failed to write len to file:%p since %s", pFile, terrstr());
|
||||||
}
|
}
|
||||||
uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId);
|
uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId);
|
||||||
|
|
||||||
FAILED:
|
|
||||||
taosWUnLockLatch(&monitorLock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void restartReportTimer(int64_t clusterId){
|
static void monitorSendAllSlowLog(bool quit){
|
||||||
taosWLockLatch(&monitorLock);
|
int64_t t = taosGetMonoTimestampMs();
|
||||||
|
|
||||||
void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
|
void* pIter = NULL;
|
||||||
if(tmp){
|
while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
|
||||||
taosTmrStopA(&(*(SlowLogClient**)tmp)->timer);
|
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId);
|
||||||
if(pInst == NULL){
|
bool truncated = false;
|
||||||
uError("failed to get app inst, clusterId:%"PRIx64, clusterId);
|
if(quit || (pInst != NULL && t - (*(SlowLogClient**)pIter)->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000)) {
|
||||||
return;
|
(*(SlowLogClient**)pIter)->lastCheckTime = t;
|
||||||
|
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||||
|
truncated = monitorReadSendSlowLog((*(SlowLogClient**)pIter)->pFile, pInst->pTransporter, &ep);
|
||||||
}
|
}
|
||||||
(*(SlowLogClient**)tmp)->timer = taosTmrStart(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, (void*)clusterId, monitorTimer);
|
|
||||||
|
|
||||||
|
if(quit){
|
||||||
|
taosUnLockFile((*(SlowLogClient**)pIter)->pFile);
|
||||||
|
taosCloseFile(&((*(SlowLogClient**)pIter)->pFile));
|
||||||
|
if(truncated){
|
||||||
|
taosRemoveFile((*(SlowLogClient**)pIter)->path);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&monitorLock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* monitorThreadFunc(void *param){
|
static void* monitorThreadFunc(void *param){
|
||||||
|
@ -590,7 +559,10 @@ static void* monitorThreadFunc(void *param){
|
||||||
}
|
}
|
||||||
uDebug("monitorThreadFunc start");
|
uDebug("monitorThreadFunc start");
|
||||||
while (1) {
|
while (1) {
|
||||||
if (slowLogFlag > 0) break;
|
if (slowLogFlag > 0) {
|
||||||
|
monitorSendAllSlowLog(true);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
MonitorSlowLogData* slowLogData = NULL;
|
MonitorSlowLogData* slowLogData = NULL;
|
||||||
taosReadQitem(monitorQueue, (void**)&slowLogData);
|
taosReadQitem(monitorQueue, (void**)&slowLogData);
|
||||||
|
@ -598,15 +570,15 @@ static void* monitorThreadFunc(void *param){
|
||||||
uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value);
|
uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value);
|
||||||
if (slowLogData->value == NULL){
|
if (slowLogData->value == NULL){
|
||||||
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
|
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
|
||||||
} else if(strlen(slowLogData->value) == 0){
|
|
||||||
restartReportTimer(slowLogData->clusterId);
|
|
||||||
} else{
|
} else{
|
||||||
monitorWriteSlowLog2File(slowLogData, tmpPath);
|
monitorWriteSlowLog2File(slowLogData, tmpPath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
monitorFreeSlowLogData(slowLogData);
|
monitorFreeSlowLogData(slowLogData);
|
||||||
taosFreeQitem(slowLogData);
|
taosFreeQitem(slowLogData);
|
||||||
tsem2_timewait(&monitorSem, 500);
|
|
||||||
|
monitorSendAllSlowLog(false);
|
||||||
|
tsem2_timewait(&monitorSem, 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseQueue(monitorQueue);
|
taosCloseQueue(monitorQueue);
|
||||||
|
@ -671,7 +643,6 @@ void monitorClose() {
|
||||||
uDebug("[monitor] monitorFlag is not 0");
|
uDebug("[monitor] monitorFlag is not 0");
|
||||||
}
|
}
|
||||||
tscMonitorStop();
|
tscMonitorStop();
|
||||||
sendAllSlowLog();
|
|
||||||
sendAllCounter();
|
sendAllCounter();
|
||||||
taosHashCleanup(monitorCounterHash);
|
taosHashCleanup(monitorCounterHash);
|
||||||
taosHashCleanup(monitorSlowLogHash);
|
taosHashCleanup(monitorSlowLogHash);
|
||||||
|
|
Loading…
Reference in New Issue