diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 7a84215e12..0d22257433 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -454,6 +454,8 @@ enum { void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type); +void tmqMgmtClose(void); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 56f89ffba6..d0bd1ec650 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -57,8 +57,6 @@ void taos_cleanup(void) { } monitorClose(); - taosHashCleanup(appInfo.pInstMap); - taosHashCleanup(appInfo.pInstMapByClusterId); tscStopCrashReport(); hbMgrCleanUp(); @@ -88,6 +86,9 @@ void taos_cleanup(void) { tscInfo("all local resources released"); taosCleanupCfg(); taosCloseLog(); + tmqMgmtClose(); + taosHashCleanup(appInfo.pInstMap); + taosHashCleanup(appInfo.pInstMapByClusterId); } static setConfRet taos_set_config_imp(const char *config) { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 21d1a528da..c448f115bb 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -29,7 +29,6 @@ #define DEFAULT_HEARTBEAT_INTERVAL 3000 struct SMqMgmt { - int8_t inited; tmr_h timer; int32_t rsetId; }; @@ -1066,6 +1065,18 @@ void tmqFreeImpl(void* handle) { taos_close_internal(tmq->pTscObj); taosMemoryFree(tmq); + if(tmq->commitTimer) { + taosTmrStopA(tmq->commitTimer); + tmq->commitTimer = NULL; + } + if(tmq->epTimer) { + taosTmrStopA(tmq->epTimer); + tmq->epTimer = NULL; + } + if(tmq->hbLiveTimer) { + taosTmrStopA(tmq->hbLiveTimer); + tmq->hbLiveTimer = NULL; + } tscDebug("consumer:0x%" PRIx64 " closed", id); } @@ -1083,6 +1094,18 @@ static void tmqMgmtInit(void) { } } +void tmqMgmtClose(void) { + if (tmqMgmt.timer) { + taosTmrCleanUp(tmqMgmt.timer); + tmqMgmt.timer = NULL; + } + + if (tmqMgmt.rsetId >= 0) { + taosCloseRef(tmqMgmt.rsetId); + tmqMgmt.rsetId = -1; + } +} + #define SET_ERROR_MSG_TMQ(MSG) \ if (errstr != NULL) snprintf(errstr, errstrLen, MSG);