Merge pull request #26411 from taosdata/fix/TS-4921
fix:[TS-4921] send data to queue error if monitor thread starts later or failed
This commit is contained in:
commit
d544d3628d
|
@ -38,6 +38,13 @@ typedef enum {
|
||||||
SLOW_LOG_READ_QUIT = 3,
|
SLOW_LOG_READ_QUIT = 3,
|
||||||
} SLOW_LOG_QUEUE_TYPE;
|
} SLOW_LOG_QUEUE_TYPE;
|
||||||
|
|
||||||
|
static char* queueTypeStr[] = {
|
||||||
|
"SLOW_LOG_WRITE",
|
||||||
|
"SLOW_LOG_READ_RUNNING",
|
||||||
|
"SLOW_LOG_READ_BEGINNIG",
|
||||||
|
"SLOW_LOG_READ_QUIT"
|
||||||
|
};
|
||||||
|
|
||||||
#define SLOW_LOG_SEND_SIZE_MAX 1024*1024
|
#define SLOW_LOG_SEND_SIZE_MAX 1024*1024
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -65,7 +72,7 @@ typedef struct {
|
||||||
} MonitorSlowLogData;
|
} MonitorSlowLogData;
|
||||||
|
|
||||||
void monitorClose();
|
void monitorClose();
|
||||||
void monitorInit();
|
int32_t monitorInit();
|
||||||
|
|
||||||
void monitorClientSQLReqInit(int64_t clusterKey);
|
void monitorClientSQLReqInit(int64_t clusterKey);
|
||||||
void monitorClientSlowQueryInit(int64_t clusterId);
|
void monitorClientSlowQueryInit(int64_t clusterId);
|
||||||
|
|
|
@ -864,10 +864,15 @@ void taos_init_imp(void) {
|
||||||
initQueryModuleMsgHandle();
|
initQueryModuleMsgHandle();
|
||||||
|
|
||||||
if (taosConvInit() != 0) {
|
if (taosConvInit() != 0) {
|
||||||
|
tscInitRes = -1;
|
||||||
tscError("failed to init conv");
|
tscError("failed to init conv");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (monitorInit() != 0){
|
||||||
|
tscInitRes = -1;
|
||||||
|
tscError("failed to init monitor");
|
||||||
|
return;
|
||||||
|
}
|
||||||
rpcInit();
|
rpcInit();
|
||||||
|
|
||||||
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
|
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
|
||||||
|
@ -891,7 +896,6 @@ void taos_init_imp(void) {
|
||||||
taosThreadMutexInit(&appInfo.mutex, NULL);
|
taosThreadMutexInit(&appInfo.mutex, NULL);
|
||||||
|
|
||||||
tscCrashReportInit();
|
tscCrashReportInit();
|
||||||
monitorInit();
|
|
||||||
|
|
||||||
tscDebug("client is initialized successfully");
|
tscDebug("client is initialized successfully");
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ int32_t quitCnt = 0;
|
||||||
tsem2_t monitorSem;
|
tsem2_t monitorSem;
|
||||||
STaosQueue* monitorQueue;
|
STaosQueue* monitorQueue;
|
||||||
SHashObj* monitorSlowLogHash;
|
SHashObj* monitorSlowLogHash;
|
||||||
|
char tmpSlowLogPath[PATH_MAX] = {0};
|
||||||
|
|
||||||
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){
|
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){
|
||||||
if (tsTempDir == NULL) {
|
if (tsTempDir == NULL) {
|
||||||
|
@ -453,6 +454,10 @@ static int64_t getFileSize(char* path){
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){
|
static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){
|
||||||
|
if (data == NULL){
|
||||||
|
taosMemoryFree(fileName);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
|
MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
|
||||||
if(pParam == NULL){
|
if(pParam == NULL){
|
||||||
taosMemoryFree(data);
|
taosMemoryFree(data);
|
||||||
|
@ -468,18 +473,26 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64
|
||||||
return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
|
return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset, void* pTransporter, SEpSet *epSet){
|
static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size, SLOW_LOG_QUEUE_TYPE type, char* fileName){
|
||||||
|
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
||||||
|
if(pInst == NULL){
|
||||||
|
tscError("failed to get app instance by clusterId:%" PRId64, clusterId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||||
|
char* data = readFile(pFile, offset, size);
|
||||||
|
return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName, pInst->pTransporter, &ep);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset){
|
||||||
int64_t size = getFileSize(*fileName);
|
int64_t size = getFileSize(*fileName);
|
||||||
if(size <= offset){
|
if(size <= offset){
|
||||||
processFileInTheEnd(pFile, *fileName);
|
processFileInTheEnd(pFile, *fileName);
|
||||||
tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName);
|
tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName);
|
||||||
}else{
|
}else{
|
||||||
char* data = readFile(pFile, &offset, size);
|
int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
|
||||||
if(data != NULL){
|
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log clusterId:%"PRId64",ret:%d", clusterId, code);
|
||||||
sendSlowLog(clusterId, data, pFile, offset, SLOW_LOG_READ_BEGINNIG, *fileName, pTransporter, epSet);
|
*fileName = NULL;
|
||||||
*fileName = NULL;
|
|
||||||
}
|
|
||||||
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log file:%p, data:%s", pFile, data);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -500,17 +513,8 @@ static void monitorSendSlowLogAtRunning(int64_t clusterId){
|
||||||
tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile);
|
tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile);
|
||||||
pClient->offset = 0;
|
pClient->offset = 0;
|
||||||
}else{
|
}else{
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
|
||||||
if(pInst == NULL){
|
tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log clusterId:%"PRId64",ret:%d", clusterId, code);
|
||||||
tscError("failed to get app instance by clusterId:%" PRId64, clusterId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
|
||||||
char* data = readFile(pClient->pFile, &pClient->offset, size);
|
|
||||||
if(data != NULL){
|
|
||||||
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
|
|
||||||
}
|
|
||||||
tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log:%s", data);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -532,16 +536,8 @@ static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
|
int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
|
||||||
if(pInst == NULL) {
|
tscDebug("[monitor] monitorSendSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", clusterId, code);
|
||||||
return true;
|
|
||||||
}
|
|
||||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
|
||||||
char* data = readFile(pClient->pFile, &pClient->offset, size);
|
|
||||||
if(data != NULL){
|
|
||||||
sendSlowLog(clusterId, data, pClient->pFile, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep);
|
|
||||||
}
|
|
||||||
tscInfo("[monitor] monitorSendSlowLogAtQuit send slow log:%s", data);
|
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -558,16 +554,11 @@ static void monitorSendAllSlowLogAtQuit(){
|
||||||
pClient->pFile = NULL;
|
pClient->pFile = NULL;
|
||||||
}else if(pClient->offset == 0){
|
}else if(pClient->offset == 0){
|
||||||
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
|
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId);
|
int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
|
||||||
if(pInst == NULL) {
|
tscDebug("[monitor] monitorSendAllSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", *clusterId, code);
|
||||||
continue;
|
if (code == 0){
|
||||||
}
|
|
||||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
|
||||||
char* data = readFile(pClient->pFile, &pClient->offset, size);
|
|
||||||
if(data != NULL && sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_QUIT, NULL, pInst->pTransporter, &ep) == 0){
|
|
||||||
quitCnt ++;
|
quitCnt ++;
|
||||||
}
|
}
|
||||||
tscInfo("[monitor] monitorSendAllSlowLogAtQuit send slow log :%s", data);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -613,12 +604,8 @@ static void monitorSendAllSlowLog(){
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
|
||||||
char* data = readFile(pClient->pFile, &pClient->offset, size);
|
tscDebug("[monitor] monitorSendAllSlowLog send slow log clusterId:%"PRId64",ret:%d", *clusterId, code);
|
||||||
if(data != NULL){
|
|
||||||
sendSlowLog(*clusterId, data, NULL, pClient->offset, SLOW_LOG_READ_RUNNING, NULL, pInst->pTransporter, &ep);
|
|
||||||
}
|
|
||||||
tscDebug("[monitor] monitorSendAllSlowLog send slow log :%s", data);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -631,7 +618,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
char namePrefix[PATH_MAX] = {0};
|
char namePrefix[PATH_MAX] = {0};
|
||||||
if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, pInst->clusterId) < 0) {
|
if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) {
|
||||||
tscError("failed to generate slow log file name prefix");
|
tscError("failed to generate slow log file name prefix");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -656,7 +643,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
||||||
if (strcmp(name, ".") == 0 ||
|
if (strcmp(name, ".") == 0 ||
|
||||||
strcmp(name, "..") == 0 ||
|
strcmp(name, "..") == 0 ||
|
||||||
strstr(name, namePrefix) == NULL) {
|
strstr(name, namePrefix) == NULL) {
|
||||||
tscInfo("skip file:%s, for cluster id:%"PRIx64, name, pInst->clusterId);
|
tscInfo("skip file:%s, for cluster id:%"PRIx64, name, clusterId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -672,9 +659,8 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
|
||||||
char *tmp = taosStrdup(filename);
|
char *tmp = taosStrdup(filename);
|
||||||
monitorSendSlowLogAtBeginning(pInst->clusterId, &tmp, pFile, 0, pInst->pTransporter, &ep);
|
monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0);
|
||||||
taosMemoryFree(tmp);
|
taosMemoryFree(tmp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -690,28 +676,6 @@ static void* monitorThreadFunc(void *param){
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
char tmpPath[PATH_MAX] = {0};
|
|
||||||
if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0){
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosMulModeMkDir(tmpPath, 0777, true) != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
printf("failed to create dir:%s since %s", tmpPath, terrstr());
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsem2_init(&monitorSem, 0, 0) != 0) {
|
|
||||||
tscError("sem init error since %s", terrstr());
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
monitorQueue = taosOpenQueue();
|
|
||||||
if(monitorQueue == NULL){
|
|
||||||
tscError("open queue error since %s", terrstr());
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
|
if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -738,16 +702,12 @@ static void* monitorThreadFunc(void *param){
|
||||||
if (slowLogData != NULL) {
|
if (slowLogData != NULL) {
|
||||||
if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){
|
if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){
|
||||||
if(slowLogData->pFile != NULL){
|
if(slowLogData->pFile != NULL){
|
||||||
SAppInstInfo* pInst = getAppInstByClusterId(slowLogData->clusterId);
|
monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset);
|
||||||
if(pInst != NULL) {
|
|
||||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
|
||||||
monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset, pInst->pTransporter, &ep);
|
|
||||||
}
|
|
||||||
}else{
|
}else{
|
||||||
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
|
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
|
||||||
}
|
}
|
||||||
} else if(slowLogData->type == SLOW_LOG_WRITE){
|
} else if(slowLogData->type == SLOW_LOG_WRITE){
|
||||||
monitorWriteSlowLog2File(slowLogData, tmpPath);
|
monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath);
|
||||||
} else if(slowLogData->type == SLOW_LOG_READ_RUNNING){
|
} else if(slowLogData->type == SLOW_LOG_READ_RUNNING){
|
||||||
monitorSendSlowLogAtRunning(slowLogData->clusterId);
|
monitorSendSlowLogAtRunning(slowLogData->clusterId);
|
||||||
} else if(slowLogData->type == SLOW_LOG_READ_QUIT){
|
} else if(slowLogData->type == SLOW_LOG_READ_QUIT){
|
||||||
|
@ -799,27 +759,59 @@ static void tscMonitorStop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void monitorInit() {
|
int32_t monitorInit() {
|
||||||
tscInfo("[monitor] tscMonitor init");
|
tscInfo("[monitor] tscMonitor init");
|
||||||
monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
if (monitorCounterHash == NULL) {
|
if (monitorCounterHash == NULL) {
|
||||||
tscError("failed to create monitorCounterHash");
|
tscError("failed to create monitorCounterHash");
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
|
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
|
||||||
|
|
||||||
monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
if (monitorSlowLogHash == NULL) {
|
if (monitorSlowLogHash == NULL) {
|
||||||
tscError("failed to create monitorSlowLogHash");
|
tscError("failed to create monitorSlowLogHash");
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
|
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
|
||||||
|
|
||||||
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
|
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
|
||||||
if (monitorTimer == NULL) {
|
if (monitorTimer == NULL) {
|
||||||
tscError("failed to create monitor timer");
|
tscError("failed to create monitor timer");
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath)) < 0){
|
||||||
|
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosMulModeMkDir(tmpSlowLogPath, 0777, true) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsem2_init(&monitorSem, 0, 0) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
tscError("sem init error since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
monitorQueue = taosOpenQueue();
|
||||||
|
if(monitorQueue == NULL){
|
||||||
|
tscError("open queue error since %s", terrstr());
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosInitRWLatch(&monitorLock);
|
taosInitRWLatch(&monitorLock);
|
||||||
tscMonitortInit();
|
if (tscMonitortInit() != 0){
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void monitorClose() {
|
void monitorClose() {
|
||||||
|
@ -844,10 +836,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
*slowLogData = data;
|
*slowLogData = data;
|
||||||
tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type);
|
tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%s, data:%s", slowLogData->clusterId, queueTypeStr[slowLogData->type], slowLogData->data);
|
||||||
while (atomic_load_32(&slowLogFlag) == -1) {
|
|
||||||
taosMsleep(5);
|
|
||||||
}
|
|
||||||
if (taosWriteQitem(monitorQueue, slowLogData) == 0){
|
if (taosWriteQitem(monitorQueue, slowLogData) == 0){
|
||||||
tsem2_post(&monitorSem);
|
tsem2_post(&monitorSem);
|
||||||
}else{
|
}else{
|
||||||
|
|
|
@ -154,13 +154,14 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
if(taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL){
|
if(taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL){
|
||||||
if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){
|
if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){
|
||||||
tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
|
tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
|
||||||
|
}else{
|
||||||
|
MonitorSlowLogData data = {0};
|
||||||
|
data.clusterId = pTscObj->pAppInfo->clusterId;
|
||||||
|
data.type = SLOW_LOG_READ_BEGINNIG;
|
||||||
|
monitorPutData2MonitorQueue(data);
|
||||||
|
monitorClientSlowQueryInit(connectRsp.clusterId);
|
||||||
|
monitorClientSQLReqInit(connectRsp.clusterId);
|
||||||
}
|
}
|
||||||
MonitorSlowLogData data = {0};
|
|
||||||
data.clusterId = pTscObj->pAppInfo->clusterId;
|
|
||||||
data.type = SLOW_LOG_READ_BEGINNIG;
|
|
||||||
monitorPutData2MonitorQueue(data);
|
|
||||||
monitorClientSlowQueryInit(connectRsp.clusterId);
|
|
||||||
monitorClientSQLReqInit(connectRsp.clusterId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&clientHbMgr.lock);
|
taosThreadMutexLock(&clientHbMgr.lock);
|
||||||
|
|
Loading…
Reference in New Issue