diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index d05abb2051..7c342b85d4 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -455,6 +455,8 @@ enum { void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type); +void tmqMgmtClose(void); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 56f89ffba6..f65edc103a 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -57,8 +57,6 @@ void taos_cleanup(void) { } monitorClose(); - taosHashCleanup(appInfo.pInstMap); - taosHashCleanup(appInfo.pInstMapByClusterId); tscStopCrashReport(); hbMgrCleanUp(); @@ -85,6 +83,8 @@ void taos_cleanup(void) { taosConvDestroy(); + tmqMgmtClose(); + tscInfo("all local resources released"); taosCleanupCfg(); taosCloseLog(); diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index d1a9897caa..ae2a57ba97 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -682,7 +682,7 @@ static void* monitorThreadFunc(void *param){ tscDebug("monitorThreadFunc start"); int64_t quitTime = 0; while (1) { - if (slowLogFlag > 0) { + if (atomic_load_32(&slowLogFlag) > 0 > 0) { if(quitCnt == 0){ monitorSendAllSlowLogAtQuit(); if(quitCnt == 0){ @@ -727,10 +727,7 @@ static void* monitorThreadFunc(void *param){ } tsem2_timewait(&monitorSem, 100); } - - taosCloseQueue(monitorQueue); - tsem2_destroy(&monitorSem); - slowLogFlag = -2; + atomic_store_32(&slowLogFlag, -2); return NULL; } @@ -826,10 +823,16 @@ void monitorClose() { taosHashCleanup(monitorCounterHash); taosHashCleanup(monitorSlowLogHash); taosTmrCleanUp(monitorTimer); + taosCloseQueue(monitorQueue); + tsem2_destroy(&monitorSem); taosWUnLockLatch(&monitorLock); } int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){ + if (atomic_load_32(&slowLogFlag) == -2) { + tscError("[monitor] slow log thread is exiting"); + return -1; + } MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0); if (slowLogData == NULL) { tscError("[monitor] failed to allocate slow log data"); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 10d5140a0b..3c6ef00bf4 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -30,7 +30,6 @@ #define DEFAULT_ASKEP_INTERVAL 1000 struct SMqMgmt { - int8_t inited; tmr_h timer; int32_t rsetId; }; @@ -1057,6 +1056,16 @@ void tmqFreeImpl(void* handle) { taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); taos_close_internal(tmq->pTscObj); + + if(tmq->commitTimer) { + taosTmrStopA(&tmq->commitTimer); + } + if(tmq->epTimer) { + taosTmrStopA(&tmq->epTimer); + } + if(tmq->hbLiveTimer) { + taosTmrStopA(&tmq->hbLiveTimer); + } taosMemoryFree(tmq); tscDebug("consumer:0x%" PRIx64 " closed", id); @@ -1076,6 +1085,18 @@ static void tmqMgmtInit(void) { } } +void tmqMgmtClose(void) { + if (tmqMgmt.timer) { + taosTmrCleanUp(tmqMgmt.timer); + tmqMgmt.timer = NULL; + } + + if (tmqMgmt.rsetId >= 0) { + taosCloseRef(tmqMgmt.rsetId); + tmqMgmt.rsetId = -1; + } +} + #define SET_ERROR_MSG_TMQ(MSG) \ if (errstr != NULL) snprintf(errstr, errstrLen, MSG); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 795c21c234..fc92be8214 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -201,6 +201,7 @@ void schedulerDestroy(void) { } SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); + taosTmrCleanUp(schMgmt.timer); qWorkerDestroy(&schMgmt.queryMgmt); schMgmt.queryMgmt = NULL; }