Merge pull request #26903 from taosdata/fix/TD-31015

fix:[TD-31015]monitor close first before slow log thread exit
This commit is contained in:
dapan1121 2024-08-01 10:01:27 +08:00 committed by GitHub
commit 65181fb1d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 19 additions and 42 deletions

View File

@ -12,13 +12,13 @@
SRWLatch monitorLock; SRWLatch monitorLock;
void* monitorTimer; void* monitorTimer;
SHashObj* monitorCounterHash; SHashObj* monitorCounterHash;
int32_t slowLogFlag = -1; int32_t monitorFlag = 0;
int32_t monitorFlag = -1;
int32_t quitCnt = 0; int32_t quitCnt = 0;
tsem2_t monitorSem; tsem2_t monitorSem;
STaosQueue* monitorQueue; STaosQueue* monitorQueue;
SHashObj* monitorSlowLogHash; SHashObj* monitorSlowLogHash;
char tmpSlowLogPath[PATH_MAX] = {0}; char tmpSlowLogPath[PATH_MAX] = {0};
TdThread monitorThread;
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) { static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
@ -113,11 +113,11 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId);
} }
MonitorSlowLogData tmp = {.clusterId = p->clusterId, MonitorSlowLogData tmp = {.clusterId = p->clusterId,
.type = p->type, .type = p->type,
.fileName = p->fileName, .fileName = p->fileName,
.pFile = p->pFile, .pFile = p->pFile,
.offset = p->offset, .offset = p->offset,
.data = NULL}; .data = NULL};
if (monitorPutData2MonitorQueue(tmp) == 0) { if (monitorPutData2MonitorQueue(tmp) == 0) {
p->fileName = NULL; p->fileName = NULL;
} }
@ -164,7 +164,7 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO
int64_t transporterId = 0; int64_t transporterId = 0;
return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo);
FAILED: FAILED:
monitorFreeSlowLogDataEx(param); monitorFreeSlowLogDataEx(param);
return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR); return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR);
} }
@ -276,12 +276,10 @@ void monitorCreateClient(int64_t clusterId) {
tscInfo("[monitor] monitorCreateClient for %" PRIx64 "finished %p.", clusterId, pMonitor); tscInfo("[monitor] monitorCreateClient for %" PRIx64 "finished %p.", clusterId, pMonitor);
} }
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) {
tscDebug("[monitor] monitorFlag already is 0");
}
return; return;
fail: fail:
destroyMonitorClient(&pMonitor); destroyMonitorClient(&pMonitor);
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
} }
@ -301,7 +299,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
tscError("failed to add metric to collector"); tscError("failed to add metric to collector");
(void)taos_counter_destroy(newCounter); (void)taos_counter_destroy(newCounter);
goto end; goto end;
} }
if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) { if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
tscError("failed to put counter to monitor"); tscError("failed to put counter to monitor");
(void)taos_counter_destroy(newCounter); (void)taos_counter_destroy(newCounter);
@ -310,7 +308,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name,
newCounter); newCounter);
end: end:
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
} }
@ -339,7 +337,7 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char**
} }
tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName);
end: end:
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
} }
@ -348,8 +346,6 @@ const char* monitorResultStr(SQL_RESULT_CODE code) {
return result_state[code]; return result_state[code];
} }
static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); }
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) { static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) {
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
@ -693,20 +689,10 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
static void* monitorThreadFunc(void* param) { static void* monitorThreadFunc(void* param) {
setThreadName("client-monitor-slowlog"); setThreadName("client-monitor-slowlog");
#ifdef WINDOWS
if (taosCheckCurrentInDll()) {
atexit(monitorThreadFuncUnexpectedStopped);
}
#endif
if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
return NULL;
}
tscDebug("monitorThreadFunc start"); tscDebug("monitorThreadFunc start");
int64_t quitTime = 0; int64_t quitTime = 0;
while (1) { while (1) {
if (atomic_load_32(&slowLogFlag) > 0) { if (atomic_load_32(&monitorFlag) == 1) {
if (quitCnt == 0) { if (quitCnt == 0) {
monitorSendAllSlowLogAtQuit(); monitorSendAllSlowLogAtQuit();
if (quitCnt == 0) { if (quitCnt == 0) {
@ -752,7 +738,6 @@ static void* monitorThreadFunc(void* param) {
} }
(void)tsem2_timewait(&monitorSem, 100); (void)tsem2_timewait(&monitorSem, 100);
} }
atomic_store_32(&slowLogFlag, -2);
return NULL; return NULL;
} }
@ -767,7 +752,6 @@ static int32_t tscMonitortInit() {
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
TdThread monitorThread;
if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
tscError("failed to create monitor thread since %s", strerror(errno)); tscError("failed to create monitor thread since %s", strerror(errno));
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
@ -778,13 +762,9 @@ static int32_t tscMonitortInit() {
} }
static void tscMonitorStop() { static void tscMonitorStop() {
if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) { if (taosCheckPthreadValid(monitorThread)) {
tscDebug("monitor thread already stopped"); (void)taosThreadJoin(monitorThread, NULL);
return; (void)taosThreadClear(&monitorThread);
}
while (atomic_load_32(&slowLogFlag) > 0) {
taosMsleep(100);
} }
} }
@ -842,10 +822,7 @@ int32_t monitorInit() {
void monitorClose() { void monitorClose() {
tscInfo("[monitor] tscMonitor close"); tscInfo("[monitor] tscMonitor close");
taosWLockLatch(&monitorLock); taosWLockLatch(&monitorLock);
atomic_store_32(&monitorFlag, 1);
if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) {
tscDebug("[monitor] monitorFlag is not 0");
}
tscMonitorStop(); tscMonitorStop();
sendAllCounter(); sendAllCounter();
taosHashCleanup(monitorCounterHash); taosHashCleanup(monitorCounterHash);
@ -860,7 +837,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
int32_t code = 0; int32_t code = 0;
MonitorSlowLogData* slowLogData = NULL; MonitorSlowLogData* slowLogData = NULL;
if (atomic_load_32(&slowLogFlag) == -2) { if (atomic_load_32(&monitorFlag) == 1) {
tscError("[monitor] slow log thread is exiting"); tscError("[monitor] slow log thread is exiting");
return -1; return -1;
} }