diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 83aff351dd..190a724151 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -253,7 +253,7 @@ void taos_cleanup(void) { taosCloseRef(id); nodesDestroyAllocatorSet(); - // cleanupAppInfo(); + cleanupAppInfo(); rpcCleanup(); tscDebug("rpc cleanup"); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f4426fc94a..007a23720c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -74,8 +74,9 @@ enum { }; typedef struct { - tmr_h timer; - int32_t rsetId; + tmr_h timer; + int32_t rsetId; + TdThreadMutex lock; } SMqMgmt; struct tmq_list_t { @@ -1603,13 +1604,21 @@ static void tmqMgmtInit(void) { tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ"); if (tmqMgmt.timer == NULL) { - tmqInitRes = terrno; + goto END; } tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl); if (tmqMgmt.rsetId < 0) { - tmqInitRes = terrno; + goto END; } + + if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){ + goto END; + } + return; + +END: + tmqInitRes = terrno; } void tmqMgmtClose(void) { @@ -1618,10 +1627,28 @@ void tmqMgmtClose(void) { tmqMgmt.timer = NULL; } + (void) taosThreadMutexLock(&tmqMgmt.lock); if (tmqMgmt.rsetId >= 0) { + tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0); + int64_t refId = 0; + + while (tmq) { + refId = tmq->refId; + if (refId == 0) { + break; + } + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); + + if (taosRemoveRef(tmqMgmt.rsetId, tmq->refId) != 0) { + qWarn("taosRemoveRef tmq refId:%" PRId64 " failed, error:%s", refId, tstrerror(terrno)); + } + + tmq = taosIterateRef(tmqMgmt.rsetId, refId); + } taosCloseRef(tmqMgmt.rsetId); tmqMgmt.rsetId = -1; } + (void)taosThreadMutexUnlock(&tmqMgmt.lock); } tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { @@ -2617,8 +2644,13 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { int32_t tmq_consumer_close(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; + int32_t code = 0; + (void) taosThreadMutexLock(&tmqMgmt.lock); + if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){ + goto end; + } tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); - int32_t code = tmq_unsubscribe(tmq); + code = tmq_unsubscribe(tmq); if (code == 0) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId); @@ -2626,6 +2658,9 @@ int32_t tmq_consumer_close(tmq_t* tmq) { tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code); } } + +end: + (void)taosThreadMutexUnlock(&tmqMgmt.lock); return code; }