fix:[TS-4921]refactor reporting logic for slow log
This commit is contained in:
parent
4739607b91
commit
2dad4d0053
|
@ -1,4 +1,5 @@
|
||||||
#include "clientMonitor.h"
|
#include "clientMonitor.h"
|
||||||
|
#include "clientLog.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tmisce.h"
|
#include "tmisce.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
|
@ -24,7 +25,7 @@ static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){
|
||||||
}
|
}
|
||||||
int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
|
int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
|
||||||
if (ret < 0){
|
if (ret < 0){
|
||||||
uError("failed to get tmp path ret:%d", ret);
|
tscError("failed to get tmp path ret:%d", ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -78,7 +79,7 @@ static void monitorFreeSlowLogData(void *paras) {
|
||||||
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
|
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
|
||||||
void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
|
void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
|
||||||
if(p == NULL){
|
if(p == NULL){
|
||||||
uError("failed to get app inst, clusterId:%" PRIx64, clusterId);
|
tscError("failed to get app inst, clusterId:%" PRIx64, clusterId);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
return *(SAppInstInfo**)p;
|
return *(SAppInstInfo**)p;
|
||||||
|
@ -86,7 +87,7 @@ static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
|
||||||
|
|
||||||
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
|
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
uError("found error in monitorReport send callback, code:%d, please check the network.", code);
|
tscError("found error in monitorReport send callback, code:%d, please check the network.", code);
|
||||||
}
|
}
|
||||||
if (pMsg) {
|
if (pMsg) {
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
|
@ -95,7 +96,7 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
if(param != NULL){
|
if(param != NULL){
|
||||||
MonitorSlowLogData* p = (MonitorSlowLogData*)param;
|
MonitorSlowLogData* p = (MonitorSlowLogData*)param;
|
||||||
if(code != 0){
|
if(code != 0){
|
||||||
uError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId);
|
tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId);
|
||||||
}
|
}
|
||||||
if(monitorPutData2MonitorQueue(*p) == 0){
|
if(monitorPutData2MonitorQueue(*p) == 0){
|
||||||
p->fileName = NULL;
|
p->fileName = NULL;
|
||||||
|
@ -119,7 +120,7 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
|
||||||
}
|
}
|
||||||
void* buf = taosMemoryMalloc(tlen);
|
void* buf = taosMemoryMalloc(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
uError("sendReport failed, out of memory, len:%d", tlen);
|
tscError("sendReport failed, out of memory, len:%d", tlen);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto FAILED;
|
goto FAILED;
|
||||||
}
|
}
|
||||||
|
@ -127,7 +128,7 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
|
||||||
|
|
||||||
SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (pInfo == NULL) {
|
if (pInfo == NULL) {
|
||||||
uError("sendReport failed, out of memory send info");
|
tscError("sendReport failed, out of memory send info");
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
goto FAILED;
|
goto FAILED;
|
||||||
|
@ -147,7 +148,7 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
|
||||||
}
|
}
|
||||||
|
|
||||||
FAILED:
|
FAILED:
|
||||||
uError("sendReport failed, code:%d", code);
|
tscError("sendReport failed, code:%d", code);
|
||||||
monitorFreeSlowLogData(param);
|
monitorFreeSlowLogData(param);
|
||||||
taosMemoryFree(param);
|
taosMemoryFree(param);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -158,7 +159,7 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr
|
||||||
sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
|
sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
|
||||||
char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
|
char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
|
||||||
if(NULL == pCont) {
|
if(NULL == pCont) {
|
||||||
uError("generateClusterReport failed, get null content.");
|
tscError("generateClusterReport failed, get null content.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,58 +210,58 @@ void monitorCreateClient(int64_t clusterId) {
|
||||||
MonitorClient* pMonitor = NULL;
|
MonitorClient* pMonitor = NULL;
|
||||||
taosWLockLatch(&monitorLock);
|
taosWLockLatch(&monitorLock);
|
||||||
if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) {
|
if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) {
|
||||||
uInfo("[monitor] monitorCreateClient for %" PRIx64, clusterId);
|
tscInfo("[monitor] monitorCreateClient for %" PRIx64, clusterId);
|
||||||
pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient));
|
pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient));
|
||||||
if (pMonitor == NULL) {
|
if (pMonitor == NULL) {
|
||||||
uError("failed to create monitor client");
|
tscError("failed to create monitor client");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
pMonitor->clusterId = clusterId;
|
pMonitor->clusterId = clusterId;
|
||||||
char clusterKey[32] = {0};
|
char clusterKey[32] = {0};
|
||||||
if(snprintf(clusterKey, sizeof(clusterKey), "%"PRId64, clusterId) < 0){
|
if(snprintf(clusterKey, sizeof(clusterKey), "%"PRId64, clusterId) < 0){
|
||||||
uError("failed to create cluster key");
|
tscError("failed to create cluster key");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
pMonitor->registry = taos_collector_registry_new(clusterKey);
|
pMonitor->registry = taos_collector_registry_new(clusterKey);
|
||||||
if(pMonitor->registry == NULL){
|
if(pMonitor->registry == NULL){
|
||||||
uError("failed to create registry");
|
tscError("failed to create registry");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
pMonitor->colector = taos_collector_new(clusterKey);
|
pMonitor->colector = taos_collector_new(clusterKey);
|
||||||
if(pMonitor->colector == NULL){
|
if(pMonitor->colector == NULL){
|
||||||
uError("failed to create collector");
|
tscError("failed to create collector");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
|
taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
|
||||||
pMonitor->counters = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
pMonitor->counters = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
if (pMonitor->counters == NULL) {
|
if (pMonitor->counters == NULL) {
|
||||||
uError("failed to create monitor counters");
|
tscError("failed to create monitor counters");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
// taosHashSetFreeFp(pMonitor->counters, destroyCounter);
|
// taosHashSetFreeFp(pMonitor->counters, destroyCounter);
|
||||||
|
|
||||||
if(taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0){
|
if(taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0){
|
||||||
uError("failed to put monitor client to hash");
|
tscError("failed to put monitor client to hash");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
||||||
if(pInst == NULL){
|
if(pInst == NULL){
|
||||||
uError("failed to get app instance by cluster id");
|
tscError("failed to get app instance by cluster id");
|
||||||
pMonitor = NULL;
|
pMonitor = NULL;
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
pMonitor->timer = taosTmrStart(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer);
|
pMonitor->timer = taosTmrStart(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer);
|
||||||
if(pMonitor->timer == NULL){
|
if(pMonitor->timer == NULL){
|
||||||
uError("failed to start timer");
|
tscError("failed to start timer");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
uInfo("[monitor] monitorCreateClient for %"PRIx64 "finished %p.", clusterId, pMonitor);
|
tscInfo("[monitor] monitorCreateClient for %"PRIx64 "finished %p.", clusterId, pMonitor);
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&monitorLock);
|
taosWUnLockLatch(&monitorLock);
|
||||||
if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) {
|
if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) {
|
||||||
uDebug("[monitor] monitorFlag already is 0");
|
tscDebug("[monitor] monitorFlag already is 0");
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -273,7 +274,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
|
||||||
taosWLockLatch(&monitorLock);
|
taosWLockLatch(&monitorLock);
|
||||||
MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
|
MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
|
||||||
if (ppMonitor == NULL || *ppMonitor == NULL) {
|
if (ppMonitor == NULL || *ppMonitor == NULL) {
|
||||||
uError("failed to get monitor client");
|
tscError("failed to get monitor client");
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
|
taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
|
||||||
|
@ -282,11 +283,11 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
|
||||||
MonitorClient* pMonitor = *ppMonitor;
|
MonitorClient* pMonitor = *ppMonitor;
|
||||||
taos_collector_add_metric(pMonitor->colector, newCounter);
|
taos_collector_add_metric(pMonitor->colector, newCounter);
|
||||||
if(taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0){
|
if(taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0){
|
||||||
uError("failed to put counter to monitor");
|
tscError("failed to put counter to monitor");
|
||||||
taos_counter_destroy(newCounter);
|
taos_counter_destroy(newCounter);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
uInfo("[monitor] monitorCreateClientCounter %"PRIx64"(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter);
|
tscInfo("[monitor] monitorCreateClientCounter %"PRIx64"(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter);
|
||||||
|
|
||||||
end:
|
end:
|
||||||
taosWUnLockLatch(&monitorLock);
|
taosWUnLockLatch(&monitorLock);
|
||||||
|
@ -296,18 +297,18 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char**
|
||||||
taosWLockLatch(&monitorLock);
|
taosWLockLatch(&monitorLock);
|
||||||
MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
|
MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
|
||||||
if (ppMonitor == NULL || *ppMonitor == NULL) {
|
if (ppMonitor == NULL || *ppMonitor == NULL) {
|
||||||
uError("monitorCounterInc not found pMonitor %"PRId64, clusterId);
|
tscError("monitorCounterInc not found pMonitor %"PRId64, clusterId);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
MonitorClient* pMonitor = *ppMonitor;
|
MonitorClient* pMonitor = *ppMonitor;
|
||||||
taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
|
taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
|
||||||
if (ppCounter == NULL || *ppCounter == NULL) {
|
if (ppCounter == NULL || *ppCounter == NULL) {
|
||||||
uError("monitorCounterInc not found pCounter %"PRIx64":%s.", clusterId, counterName);
|
tscError("monitorCounterInc not found pCounter %"PRIx64":%s.", clusterId, counterName);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
taos_counter_inc(*ppCounter, label_values);
|
taos_counter_inc(*ppCounter, label_values);
|
||||||
uInfo("[monitor] monitorCounterInc %"PRIx64"(%p):%s", pMonitor->clusterId, pMonitor, counterName);
|
tscInfo("[monitor] monitorCounterInc %"PRIx64"(%p):%s", pMonitor->clusterId, pMonitor, counterName);
|
||||||
|
|
||||||
end:
|
end:
|
||||||
taosWUnLockLatch(&monitorLock);
|
taosWUnLockLatch(&monitorLock);
|
||||||
|
@ -327,21 +328,21 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP
|
||||||
char path[PATH_MAX] = {0};
|
char path[PATH_MAX] = {0};
|
||||||
char clusterId[32] = {0};
|
char clusterId[32] = {0};
|
||||||
if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0){
|
if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0){
|
||||||
uError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId);
|
tscError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
taosGetTmpfilePath(tmpPath, clusterId, path);
|
taosGetTmpfilePath(tmpPath, clusterId, path);
|
||||||
uInfo("[monitor] create slow log file:%s", path);
|
tscInfo("[monitor] create slow log file:%s", path);
|
||||||
pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
|
pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
uError("failed to open file:%s since %s", path, terrstr());
|
tscError("failed to open file:%s since %s", path, terrstr());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SlowLogClient *pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
|
SlowLogClient *pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
|
||||||
if (pClient == NULL){
|
if (pClient == NULL){
|
||||||
uError("failed to allocate memory for slow log client");
|
tscError("failed to allocate memory for slow log client");
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -350,14 +351,14 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP
|
||||||
pClient->offset = 0;
|
pClient->offset = 0;
|
||||||
pClient->pFile = pFile;
|
pClient->pFile = pFile;
|
||||||
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0){
|
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0){
|
||||||
uError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
|
tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
taosMemoryFree(pClient);
|
taosMemoryFree(pClient);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(taosLockFile(pFile) < 0){
|
if(taosLockFile(pFile) < 0){
|
||||||
uError("failed to lock file:%p since %s", pFile, terrstr());
|
tscError("failed to lock file:%p since %s", pFile, terrstr());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
|
@ -365,19 +366,19 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP
|
||||||
}
|
}
|
||||||
|
|
||||||
if(taosLSeekFile(pFile, 0, SEEK_END) < 0){
|
if(taosLSeekFile(pFile, 0, SEEK_END) < 0){
|
||||||
uError("failed to seek file:%p code: %d", pFile, errno);
|
tscError("failed to seek file:%p code: %d", pFile, errno);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0){
|
if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0){
|
||||||
uError("failed to write len to file:%p since %s", pFile, terrstr());
|
tscError("failed to write len to file:%p since %s", pFile, terrstr());
|
||||||
}
|
}
|
||||||
uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId);
|
tscDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){
|
static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){
|
||||||
uDebug("[monitor] readFile slow begin pFile:%p, offset:%"PRId64 ", size:%"PRId64, pFile, *offset, size);
|
tscDebug("[monitor] readFile slow begin pFile:%p, offset:%"PRId64 ", size:%"PRId64, pFile, *offset, size);
|
||||||
if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){
|
if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){
|
||||||
uError("failed to seek file:%p code: %d", pFile, errno);
|
tscError("failed to seek file:%p code: %d", pFile, errno);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,7 +394,7 @@ static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pCont == NULL){
|
if(pCont == NULL){
|
||||||
uError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
|
tscError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
char* buf = pCont;
|
char* buf = pCont;
|
||||||
|
@ -401,7 +402,7 @@ static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){
|
||||||
int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX);
|
int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX);
|
||||||
if (readSize <= 0) {
|
if (readSize <= 0) {
|
||||||
if (readSize < 0){
|
if (readSize < 0){
|
||||||
uError("failed to read len from file:%p since %s", pFile, terrstr());
|
tscError("failed to read len from file:%p since %s", pFile, terrstr());
|
||||||
}
|
}
|
||||||
taosMemoryFree(pCont);
|
taosMemoryFree(pCont);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -421,7 +422,7 @@ static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){
|
||||||
*offset += (len+1);
|
*offset += (len+1);
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("[monitor] readFile slow log end, data:%s, offset:%"PRId64, pCont, *offset);
|
tscDebug("[monitor] readFile slow log end, data:%s, offset:%"PRId64, pCont, *offset);
|
||||||
return pCont;
|
return pCont;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -456,13 +457,13 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char* fileName, TdF
|
||||||
taosUnLockFile(pFile);
|
taosUnLockFile(pFile);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
taosRemoveFile(fileName);
|
taosRemoveFile(fileName);
|
||||||
uDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", fileName);
|
tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", fileName);
|
||||||
}else{
|
}else{
|
||||||
char* data = readFile(pFile, &offset, size);
|
char* data = readFile(pFile, &offset, size);
|
||||||
if(data != NULL){
|
if(data != NULL){
|
||||||
sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, taosStrdup(fileName), pTransporter, epSet);
|
sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, taosStrdup(fileName), pTransporter, epSet);
|
||||||
}
|
}
|
||||||
uDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p, data:%s", pFile, data);
|
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p, data:%s", pFile, data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -478,14 +479,14 @@ static void monitorSendSlowLogAtRunning(int64_t clusterId){
|
||||||
int64_t size = getFileSize(pClient->path);
|
int64_t size = getFileSize(pClient->path);
|
||||||
if(size <= pClient->offset){
|
if(size <= pClient->offset){
|
||||||
if(taosFtruncateFile(pClient->pFile, 0) < 0){
|
if(taosFtruncateFile(pClient->pFile, 0) < 0){
|
||||||
uError("failed to truncate file:%p code: %d", pClient->pFile, errno);
|
tscError("failed to truncate file:%p code: %d", pClient->pFile, errno);
|
||||||
}
|
}
|
||||||
uDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile);
|
tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile);
|
||||||
pClient->offset = 0;
|
pClient->offset = 0;
|
||||||
}else{
|
}else{
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
||||||
if(pInst == NULL){
|
if(pInst == NULL){
|
||||||
uError("failed to get app instance by clusterId:%" PRId64, clusterId);
|
tscError("failed to get app instance by clusterId:%" PRId64, clusterId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||||
|
@ -493,7 +494,7 @@ static void monitorSendSlowLogAtRunning(int64_t clusterId){
|
||||||
if(data != NULL){
|
if(data != NULL){
|
||||||
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
|
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
|
||||||
}
|
}
|
||||||
uDebug("[monitor] monitorSendSlowLogAtRunning send slow log:%s", data);
|
tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log:%s", data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -511,7 +512,7 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
|
||||||
taosUnLockFile(pClient->pFile);
|
taosUnLockFile(pClient->pFile);
|
||||||
taosCloseFile(&(pClient->pFile));
|
taosCloseFile(&(pClient->pFile));
|
||||||
taosRemoveFile(pClient->path);
|
taosRemoveFile(pClient->path);
|
||||||
uInfo("[monitor] monitorSendSlowLogAtQuit remove file:%s", pClient->path);
|
tscInfo("[monitor] monitorSendSlowLogAtQuit remove file:%s", pClient->path);
|
||||||
if((--quitCnt) == 0){
|
if((--quitCnt) == 0){
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -525,7 +526,7 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
|
||||||
if(data != NULL){
|
if(data != NULL){
|
||||||
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep);
|
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep);
|
||||||
}
|
}
|
||||||
uInfo("[monitor] monitorSendSlowLogAtQuit send slow log:%s", data);
|
tscInfo("[monitor] monitorSendSlowLogAtQuit send slow log:%s", data);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -554,7 +555,7 @@ static void monitorSendAllSlowLogAtQuit(){
|
||||||
if(data != NULL && sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){
|
if(data != NULL && sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){
|
||||||
quitCnt ++;
|
quitCnt ++;
|
||||||
}
|
}
|
||||||
uInfo("[monitor] monitorSendAllSlowLogAtQuit send slow log :%s", data);
|
tscInfo("[monitor] monitorSendAllSlowLogAtQuit send slow log :%s", data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -566,7 +567,7 @@ static void processFileRemoved(SlowLogClient* pClient){
|
||||||
TdFilePtr pFile = taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
|
TdFilePtr pFile = taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
uError("failed to open file:%s since %s", pClient->path, terrstr());
|
tscError("failed to open file:%s since %s", pClient->path, terrstr());
|
||||||
}else{
|
}else{
|
||||||
pClient->pFile = pFile;
|
pClient->pFile = pFile;
|
||||||
}
|
}
|
||||||
|
@ -593,7 +594,7 @@ static void monitorSendAllSlowLog(){
|
||||||
int64_t size = getFileSize(pClient->path);
|
int64_t size = getFileSize(pClient->path);
|
||||||
if(size <= 0){
|
if(size <= 0){
|
||||||
if(size < 0){
|
if(size < 0){
|
||||||
uError("[monitor] monitorSendAllSlowLog failed to get file size:%s, err:%d", pClient->path, errno);
|
tscError("[monitor] monitorSendAllSlowLog failed to get file size:%s, err:%d", pClient->path, errno);
|
||||||
if(errno == ENOENT){
|
if(errno == ENOENT){
|
||||||
processFileRemoved(pClient);
|
processFileRemoved(pClient);
|
||||||
}
|
}
|
||||||
|
@ -605,7 +606,7 @@ static void monitorSendAllSlowLog(){
|
||||||
if(data != NULL){
|
if(data != NULL){
|
||||||
sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
|
sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
|
||||||
}
|
}
|
||||||
uDebug("[monitor] monitorSendAllSlowLog send slow log :%s", data);
|
tscDebug("[monitor] monitorSendAllSlowLog send slow log :%s", data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -614,12 +615,12 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
|
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
|
||||||
|
|
||||||
if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){
|
if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){
|
||||||
uInfo("[monitor] monitor is disabled, skip send slow log");
|
tscInfo("[monitor] monitor is disabled, skip send slow log");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
char namePrefix[PATH_MAX] = {0};
|
char namePrefix[PATH_MAX] = {0};
|
||||||
if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) {
|
if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) {
|
||||||
uError("failed to generate slow log file name prefix");
|
tscError("failed to generate slow log file name prefix");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -643,7 +644,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
||||||
if (strcmp(name, ".") == 0 ||
|
if (strcmp(name, ".") == 0 ||
|
||||||
strcmp(name, "..") == 0 ||
|
strcmp(name, "..") == 0 ||
|
||||||
strstr(name, namePrefix) == NULL) {
|
strstr(name, namePrefix) == NULL) {
|
||||||
uInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId);
|
tscInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -651,11 +652,11 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
||||||
snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
|
snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
|
||||||
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE);
|
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
uError("failed to open file:%s since %s", filename, terrstr());
|
tscError("failed to open file:%s since %s", filename, terrstr());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (taosLockFile(pFile) < 0) {
|
if (taosLockFile(pFile) < 0) {
|
||||||
uError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
|
tscError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -687,33 +688,33 @@ static void* monitorThreadFunc(void *param){
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsem2_init(&monitorSem, 0, 0) != 0) {
|
if (tsem2_init(&monitorSem, 0, 0) != 0) {
|
||||||
uError("sem init error since %s", terrstr());
|
tscError("sem init error since %s", terrstr());
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
monitorQueue = taosOpenQueue();
|
monitorQueue = taosOpenQueue();
|
||||||
if(monitorQueue == NULL){
|
if(monitorQueue == NULL){
|
||||||
uError("open queue error since %s", terrstr());
|
tscError("open queue error since %s", terrstr());
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
|
if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
uDebug("monitorThreadFunc start");
|
tscDebug("monitorThreadFunc start");
|
||||||
int64_t quitTime = 0;
|
int64_t quitTime = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
if (slowLogFlag > 0) {
|
if (slowLogFlag > 0) {
|
||||||
if(quitCnt == 0){
|
if(quitCnt == 0){
|
||||||
monitorSendAllSlowLogAtQuit();
|
monitorSendAllSlowLogAtQuit();
|
||||||
if(quitCnt == 0){
|
if(quitCnt == 0){
|
||||||
uInfo("monitorThreadFunc quit since no slow log to send");
|
tscInfo("monitorThreadFunc quit since no slow log to send");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
quitTime = taosGetMonoTimestampMs();
|
quitTime = taosGetMonoTimestampMs();
|
||||||
}
|
}
|
||||||
if(taosGetMonoTimestampMs() - quitTime > 500){ //quit at most 500ms
|
if(taosGetMonoTimestampMs() - quitTime > 500){ //quit at most 500ms
|
||||||
uInfo("monitorThreadFunc quit since timeout")
|
tscInfo("monitorThreadFunc quit since timeout");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -737,7 +738,7 @@ static void* monitorThreadFunc(void *param){
|
||||||
monitorSendSlowLogAtRunning(slowLogData->clusterId);
|
monitorSendSlowLogAtRunning(slowLogData->clusterId);
|
||||||
} else if(slowLogData->type == SLOW_LOG_READ_QUIT){
|
} else if(slowLogData->type == SLOW_LOG_READ_QUIT){
|
||||||
if(monitorSendSlowLogAtQuit(slowLogData->clusterId)){
|
if(monitorSendSlowLogAtQuit(slowLogData->clusterId)){
|
||||||
uInfo("monitorThreadFunc quit since all slow log sended");
|
tscInfo("monitorThreadFunc quit since all slow log sended");
|
||||||
monitorFreeSlowLogData(slowLogData);
|
monitorFreeSlowLogData(slowLogData);
|
||||||
taosFreeQitem(slowLogData);
|
taosFreeQitem(slowLogData);
|
||||||
break;
|
break;
|
||||||
|
@ -765,7 +766,7 @@ static int32_t tscMonitortInit() {
|
||||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
TdThread monitorThread;
|
TdThread monitorThread;
|
||||||
if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
|
if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
|
||||||
uError("failed to create monitor thread since %s", strerror(errno));
|
tscError("failed to create monitor thread since %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -775,7 +776,7 @@ static int32_t tscMonitortInit() {
|
||||||
|
|
||||||
static void tscMonitorStop() {
|
static void tscMonitorStop() {
|
||||||
if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) {
|
if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) {
|
||||||
uDebug("monitor thread already stopped");
|
tscDebug("monitor thread already stopped");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -785,22 +786,22 @@ static void tscMonitorStop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void monitorInit() {
|
void monitorInit() {
|
||||||
uInfo("[monitor] tscMonitor init");
|
tscInfo("[monitor] tscMonitor init");
|
||||||
monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
if (monitorCounterHash == NULL) {
|
if (monitorCounterHash == NULL) {
|
||||||
uError("failed to create monitorCounterHash");
|
tscError("failed to create monitorCounterHash");
|
||||||
}
|
}
|
||||||
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
|
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
|
||||||
|
|
||||||
monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
if (monitorSlowLogHash == NULL) {
|
if (monitorSlowLogHash == NULL) {
|
||||||
uError("failed to create monitorSlowLogHash");
|
tscError("failed to create monitorSlowLogHash");
|
||||||
}
|
}
|
||||||
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
|
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
|
||||||
|
|
||||||
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
|
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
|
||||||
if (monitorTimer == NULL) {
|
if (monitorTimer == NULL) {
|
||||||
uError("failed to create monitor timer");
|
tscError("failed to create monitor timer");
|
||||||
}
|
}
|
||||||
|
|
||||||
taosInitRWLatch(&monitorLock);
|
taosInitRWLatch(&monitorLock);
|
||||||
|
@ -808,11 +809,11 @@ void monitorInit() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void monitorClose() {
|
void monitorClose() {
|
||||||
uInfo("[monitor] tscMonitor close");
|
tscInfo("[monitor] tscMonitor close");
|
||||||
taosWLockLatch(&monitorLock);
|
taosWLockLatch(&monitorLock);
|
||||||
|
|
||||||
if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) {
|
if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) {
|
||||||
uDebug("[monitor] monitorFlag is not 0");
|
tscDebug("[monitor] monitorFlag is not 0");
|
||||||
}
|
}
|
||||||
tscMonitorStop();
|
tscMonitorStop();
|
||||||
sendAllCounter();
|
sendAllCounter();
|
||||||
|
@ -825,11 +826,11 @@ void monitorClose() {
|
||||||
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){
|
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){
|
||||||
MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0);
|
MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0);
|
||||||
if (slowLogData == NULL) {
|
if (slowLogData == NULL) {
|
||||||
uError("[monitor] failed to allocate slow log data");
|
tscError("[monitor] failed to allocate slow log data");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
*slowLogData = data;
|
*slowLogData = data;
|
||||||
uDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type);
|
tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type);
|
||||||
while (atomic_load_32(&slowLogFlag) == -1) {
|
while (atomic_load_32(&slowLogFlag) == -1) {
|
||||||
taosMsleep(5);
|
taosMsleep(5);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue