fix:[TS-4921] send data to queue error if monitor thread starts later or failed

This commit is contained in:
wangmm0220 2024-07-04 14:56:56 +08:00
parent 6c3410d064
commit 64e7c4c842
4 changed files with 48 additions and 37 deletions

View File

@ -65,7 +65,7 @@ typedef struct {
} MonitorSlowLogData;
void monitorClose();
void monitorInit();
int32_t monitorInit();
void monitorClientSQLReqInit(int64_t clusterKey);
void monitorClientSlowQueryInit(int64_t clusterId);

View File

@ -867,7 +867,10 @@ void taos_init_imp(void) {
tscError("failed to init conv");
return;
}
if (monitorInit() != 0){
tscError("failed to init monitor");
return;
}
rpcInit();
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
@ -891,7 +894,6 @@ void taos_init_imp(void) {
taosThreadMutexInit(&appInfo.mutex, NULL);
tscCrashReportInit();
monitorInit();
tscDebug("client is initialized successfully");
}

View File

@ -18,6 +18,7 @@ int32_t quitCnt = 0;
tsem2_t monitorSem;
STaosQueue* monitorQueue;
SHashObj* monitorSlowLogHash;
char tmpSlowLogPath[PATH_MAX] = {0};
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){
if (tsTempDir == NULL) {
@ -690,28 +691,6 @@ static void* monitorThreadFunc(void *param){
}
#endif
char tmpPath[PATH_MAX] = {0};
if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0){
return NULL;
}
if (taosMulModeMkDir(tmpPath, 0777, true) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
printf("failed to create dir:%s since %s", tmpPath, terrstr());
return NULL;
}
if (tsem2_init(&monitorSem, 0, 0) != 0) {
tscError("sem init error since %s", terrstr());
return NULL;
}
monitorQueue = taosOpenQueue();
if(monitorQueue == NULL){
tscError("open queue error since %s", terrstr());
return NULL;
}
if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
return NULL;
}
@ -747,7 +726,7 @@ static void* monitorThreadFunc(void *param){
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
}
} else if(slowLogData->type == SLOW_LOG_WRITE){
monitorWriteSlowLog2File(slowLogData, tmpPath);
monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath);
} else if(slowLogData->type == SLOW_LOG_READ_RUNNING){
monitorSendSlowLogAtRunning(slowLogData->clusterId);
} else if(slowLogData->type == SLOW_LOG_READ_QUIT){
@ -799,27 +778,59 @@ static void tscMonitorStop() {
}
}
void monitorInit() {
int32_t monitorInit() {
tscInfo("[monitor] tscMonitor init");
monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorCounterHash == NULL) {
tscError("failed to create monitorCounterHash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorSlowLogHash == NULL) {
tscError("failed to create monitorSlowLogHash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
if (monitorTimer == NULL) {
tscError("failed to create monitor timer");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath)) < 0){
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
return -1;
}
if (taosMulModeMkDir(tmpSlowLogPath, 0777, true) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
return -1;
}
if (tsem2_init(&monitorSem, 0, 0) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tscError("sem init error since %s", terrstr());
return -1;
}
monitorQueue = taosOpenQueue();
if(monitorQueue == NULL){
tscError("open queue error since %s", terrstr());
return -1;
}
taosInitRWLatch(&monitorLock);
tscMonitortInit();
if (tscMonitortInit() != 0){
return -1;
}
return 0;
}
void monitorClose() {
@ -845,9 +856,6 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){
}
*slowLogData = data;
tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%d", slowLogData->clusterId, slowLogData->type);
while (atomic_load_32(&slowLogFlag) == -1) {
taosMsleep(5);
}
if (taosWriteQitem(monitorQueue, slowLogData) == 0){
tsem2_post(&monitorSem);
}else{

View File

@ -154,13 +154,14 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if(taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL){
if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){
tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
}else{
MonitorSlowLogData data = {0};
data.clusterId = pTscObj->pAppInfo->clusterId;
data.type = SLOW_LOG_READ_BEGINNIG;
monitorPutData2MonitorQueue(data);
monitorClientSlowQueryInit(connectRsp.clusterId);
monitorClientSQLReqInit(connectRsp.clusterId);
}
MonitorSlowLogData data = {0};
data.clusterId = pTscObj->pAppInfo->clusterId;
data.type = SLOW_LOG_READ_BEGINNIG;
monitorPutData2MonitorQueue(data);
monitorClientSlowQueryInit(connectRsp.clusterId);
monitorClientSQLReqInit(connectRsp.clusterId);
}
taosThreadMutexLock(&clientHbMgr.lock);