fix:[TD-30915]tmq exit elegantly
This commit is contained in:
parent
c3e09d0852
commit
fc544705fe
|
@ -454,6 +454,8 @@ enum {
|
||||||
|
|
||||||
void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type);
|
void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type);
|
||||||
|
|
||||||
|
void tmqMgmtClose(void);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -57,8 +57,6 @@ void taos_cleanup(void) {
|
||||||
}
|
}
|
||||||
|
|
||||||
monitorClose();
|
monitorClose();
|
||||||
taosHashCleanup(appInfo.pInstMap);
|
|
||||||
taosHashCleanup(appInfo.pInstMapByClusterId);
|
|
||||||
tscStopCrashReport();
|
tscStopCrashReport();
|
||||||
|
|
||||||
hbMgrCleanUp();
|
hbMgrCleanUp();
|
||||||
|
@ -88,6 +86,9 @@ void taos_cleanup(void) {
|
||||||
tscInfo("all local resources released");
|
tscInfo("all local resources released");
|
||||||
taosCleanupCfg();
|
taosCleanupCfg();
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
|
tmqMgmtClose();
|
||||||
|
taosHashCleanup(appInfo.pInstMap);
|
||||||
|
taosHashCleanup(appInfo.pInstMapByClusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static setConfRet taos_set_config_imp(const char *config) {
|
static setConfRet taos_set_config_imp(const char *config) {
|
||||||
|
|
|
@ -29,7 +29,6 @@
|
||||||
#define DEFAULT_HEARTBEAT_INTERVAL 3000
|
#define DEFAULT_HEARTBEAT_INTERVAL 3000
|
||||||
|
|
||||||
struct SMqMgmt {
|
struct SMqMgmt {
|
||||||
int8_t inited;
|
|
||||||
tmr_h timer;
|
tmr_h timer;
|
||||||
int32_t rsetId;
|
int32_t rsetId;
|
||||||
};
|
};
|
||||||
|
@ -1066,6 +1065,18 @@ void tmqFreeImpl(void* handle) {
|
||||||
taos_close_internal(tmq->pTscObj);
|
taos_close_internal(tmq->pTscObj);
|
||||||
taosMemoryFree(tmq);
|
taosMemoryFree(tmq);
|
||||||
|
|
||||||
|
if(tmq->commitTimer) {
|
||||||
|
taosTmrStopA(tmq->commitTimer);
|
||||||
|
tmq->commitTimer = NULL;
|
||||||
|
}
|
||||||
|
if(tmq->epTimer) {
|
||||||
|
taosTmrStopA(tmq->epTimer);
|
||||||
|
tmq->epTimer = NULL;
|
||||||
|
}
|
||||||
|
if(tmq->hbLiveTimer) {
|
||||||
|
taosTmrStopA(tmq->hbLiveTimer);
|
||||||
|
tmq->hbLiveTimer = NULL;
|
||||||
|
}
|
||||||
tscDebug("consumer:0x%" PRIx64 " closed", id);
|
tscDebug("consumer:0x%" PRIx64 " closed", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1083,6 +1094,18 @@ static void tmqMgmtInit(void) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tmqMgmtClose(void) {
|
||||||
|
if (tmqMgmt.timer) {
|
||||||
|
taosTmrCleanUp(tmqMgmt.timer);
|
||||||
|
tmqMgmt.timer = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tmqMgmt.rsetId >= 0) {
|
||||||
|
taosCloseRef(tmqMgmt.rsetId);
|
||||||
|
tmqMgmt.rsetId = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#define SET_ERROR_MSG_TMQ(MSG) \
|
#define SET_ERROR_MSG_TMQ(MSG) \
|
||||||
if (errstr != NULL) snprintf(errstr, errstrLen, MSG);
|
if (errstr != NULL) snprintf(errstr, errstrLen, MSG);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue