From 6c8303297c8d8a20d7e5756eb2856cf10baeb78f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 31 Jul 2024 14:36:05 +0800 Subject: [PATCH] fix:[TD-31015]monitor close first before slow log thread exit --- source/client/src/clientMonitor.c | 61 ++++++++++--------------------- 1 file changed, 19 insertions(+), 42 deletions(-) diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 92a8fc3b29..4bb29f8d97 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -12,13 +12,13 @@ SRWLatch monitorLock; void* monitorTimer; SHashObj* monitorCounterHash; -int32_t slowLogFlag = -1; -int32_t monitorFlag = -1; +int32_t monitorFlag = 0; int32_t quitCnt = 0; tsem2_t monitorSem; STaosQueue* monitorQueue; SHashObj* monitorSlowLogHash; char tmpSlowLogPath[PATH_MAX] = {0}; +TdThread monitorThread; static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) { int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); @@ -113,11 +113,11 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); } MonitorSlowLogData tmp = {.clusterId = p->clusterId, - .type = p->type, - .fileName = p->fileName, - .pFile = p->pFile, - .offset = p->offset, - .data = NULL}; + .type = p->type, + .fileName = p->fileName, + .pFile = p->pFile, + .offset = p->offset, + .data = NULL}; if (monitorPutData2MonitorQueue(tmp) == 0) { p->fileName = NULL; } @@ -164,7 +164,7 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO int64_t transporterId = 0; return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); -FAILED: + FAILED: monitorFreeSlowLogDataEx(param); return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR); } @@ -276,12 +276,10 @@ void monitorCreateClient(int64_t clusterId) { tscInfo("[monitor] monitorCreateClient for %" PRIx64 "finished %p.", clusterId, pMonitor); } taosWUnLockLatch(&monitorLock); - if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) { - tscDebug("[monitor] monitorFlag already is 0"); - } + return; -fail: + fail: destroyMonitorClient(&pMonitor); taosWUnLockLatch(&monitorLock); } @@ -301,7 +299,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* tscError("failed to add metric to collector"); (void)taos_counter_destroy(newCounter); goto end; -} + } if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) { tscError("failed to put counter to monitor"); (void)taos_counter_destroy(newCounter); @@ -310,7 +308,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter); -end: + end: taosWUnLockLatch(&monitorLock); } @@ -339,7 +337,7 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char** } tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); -end: + end: taosWUnLockLatch(&monitorLock); } @@ -348,8 +346,6 @@ const char* monitorResultStr(SQL_RESULT_CODE code) { return result_state[code]; } -static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); } - static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) { TdFilePtr pFile = NULL; void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); @@ -693,20 +689,10 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) { static void* monitorThreadFunc(void* param) { setThreadName("client-monitor-slowlog"); - -#ifdef WINDOWS - if (taosCheckCurrentInDll()) { - atexit(monitorThreadFuncUnexpectedStopped); - } -#endif - - if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) { - return NULL; - } tscDebug("monitorThreadFunc start"); int64_t quitTime = 0; while (1) { - if (atomic_load_32(&slowLogFlag) > 0) { + if (atomic_load_32(&monitorFlag) == 1) { if (quitCnt == 0) { monitorSendAllSlowLogAtQuit(); if (quitCnt == 0) { @@ -752,7 +738,6 @@ static void* monitorThreadFunc(void* param) { } (void)tsem2_timewait(&monitorSem, 100); } - atomic_store_32(&slowLogFlag, -2); return NULL; } @@ -767,7 +752,6 @@ static int32_t tscMonitortInit() { return TSDB_CODE_TSC_INTERNAL_ERROR; } - TdThread monitorThread; if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { tscError("failed to create monitor thread since %s", strerror(errno)); return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -778,13 +762,9 @@ static int32_t tscMonitortInit() { } static void tscMonitorStop() { - if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) { - tscDebug("monitor thread already stopped"); - return; - } - - while (atomic_load_32(&slowLogFlag) > 0) { - taosMsleep(100); + if (taosCheckPthreadValid(monitorThread)) { + (void)taosThreadJoin(monitorThread, NULL); + (void)taosThreadClear(&monitorThread); } } @@ -842,10 +822,7 @@ int32_t monitorInit() { void monitorClose() { tscInfo("[monitor] tscMonitor close"); taosWLockLatch(&monitorLock); - - if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) { - tscDebug("[monitor] monitorFlag is not 0"); - } + atomic_store_32(&monitorFlag, 1); tscMonitorStop(); sendAllCounter(); taosHashCleanup(monitorCounterHash); @@ -860,7 +837,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) { int32_t code = 0; MonitorSlowLogData* slowLogData = NULL; - if (atomic_load_32(&slowLogFlag) == -2) { + if (atomic_load_32(&monitorFlag) == 1) { tscError("[monitor] slow log thread is exiting"); return -1; }