Merge pull request #26453 from taosdata/fix/TD-30915

fix:[TD-30915]tmq exit elegantly
This commit is contained in:
dapan1121 2024-07-11 13:15:10 +08:00 committed by GitHub
commit 0d784d94d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 35 additions and 8 deletions

View File

@ -455,6 +455,8 @@ enum {
void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type);
void tmqMgmtClose(void);
#ifdef __cplusplus
}
#endif

View File

@ -57,8 +57,6 @@ void taos_cleanup(void) {
}
monitorClose();
taosHashCleanup(appInfo.pInstMap);
taosHashCleanup(appInfo.pInstMapByClusterId);
tscStopCrashReport();
hbMgrCleanUp();
@ -85,6 +83,8 @@ void taos_cleanup(void) {
taosConvDestroy();
tmqMgmtClose();
tscInfo("all local resources released");
taosCleanupCfg();
taosCloseLog();

View File

@ -682,7 +682,7 @@ static void* monitorThreadFunc(void *param){
tscDebug("monitorThreadFunc start");
int64_t quitTime = 0;
while (1) {
if (slowLogFlag > 0) {
if (atomic_load_32(&slowLogFlag) > 0 > 0) {
if(quitCnt == 0){
monitorSendAllSlowLogAtQuit();
if(quitCnt == 0){
@ -727,10 +727,7 @@ static void* monitorThreadFunc(void *param){
}
tsem2_timewait(&monitorSem, 100);
}
taosCloseQueue(monitorQueue);
tsem2_destroy(&monitorSem);
slowLogFlag = -2;
atomic_store_32(&slowLogFlag, -2);
return NULL;
}
@ -826,10 +823,16 @@ void monitorClose() {
taosHashCleanup(monitorCounterHash);
taosHashCleanup(monitorSlowLogHash);
taosTmrCleanUp(monitorTimer);
taosCloseQueue(monitorQueue);
tsem2_destroy(&monitorSem);
taosWUnLockLatch(&monitorLock);
}
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){
if (atomic_load_32(&slowLogFlag) == -2) {
tscError("[monitor] slow log thread is exiting");
return -1;
}
MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0);
if (slowLogData == NULL) {
tscError("[monitor] failed to allocate slow log data");

View File

@ -30,7 +30,6 @@
#define DEFAULT_ASKEP_INTERVAL 1000
struct SMqMgmt {
int8_t inited;
tmr_h timer;
int32_t rsetId;
};
@ -1057,6 +1056,16 @@ void tmqFreeImpl(void* handle) {
taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
taos_close_internal(tmq->pTscObj);
if(tmq->commitTimer) {
taosTmrStopA(&tmq->commitTimer);
}
if(tmq->epTimer) {
taosTmrStopA(&tmq->epTimer);
}
if(tmq->hbLiveTimer) {
taosTmrStopA(&tmq->hbLiveTimer);
}
taosMemoryFree(tmq);
tscDebug("consumer:0x%" PRIx64 " closed", id);
@ -1076,6 +1085,18 @@ static void tmqMgmtInit(void) {
}
}
void tmqMgmtClose(void) {
if (tmqMgmt.timer) {
taosTmrCleanUp(tmqMgmt.timer);
tmqMgmt.timer = NULL;
}
if (tmqMgmt.rsetId >= 0) {
taosCloseRef(tmqMgmt.rsetId);
tmqMgmt.rsetId = -1;
}
}
#define SET_ERROR_MSG_TMQ(MSG) \
if (errstr != NULL) snprintf(errstr, errstrLen, MSG);

View File

@ -201,6 +201,7 @@ void schedulerDestroy(void) {
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
taosTmrCleanUp(schMgmt.timer);
qWorkerDestroy(&schMgmt.queryMgmt);
schMgmt.queryMgmt = NULL;
}