From ba469cc9672c77b39a1a7e28cd1344fa575569c3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 20 Jan 2025 19:28:03 +0800 Subject: [PATCH] fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585 --- source/client/src/clientMain.c | 2 +- source/client/src/clientTmq.c | 28 +++++++++++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 83aff351dd..190a724151 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -253,7 +253,7 @@ void taos_cleanup(void) { taosCloseRef(id); nodesDestroyAllocatorSet(); - // cleanupAppInfo(); + cleanupAppInfo(); rpcCleanup(); tscDebug("rpc cleanup"); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 78bceb4ef8..be3bcf25b3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1619,6 +1619,24 @@ void tmqMgmtClose(void) { } if (tmqMgmt.rsetId >= 0) { + tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0); + int64_t refId = 0; + + while (tmq) { + refId = tmq->refId; + if (refId == 0) { + break; + } + taosWLockLatch(&tmq->lock); + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); + taosWUnLockLatch(&tmq->lock); + + if (taosRemoveRef(tmqMgmt.rsetId, tmq->refId) != 0) { + qWarn("taosRemoveRef tmq refId:%" PRId64 " failed, error:%s", refId, tstrerror(terrno)); + } + + tmq = taosIterateRef(tmqMgmt.rsetId, refId); + } taosCloseRef(tmqMgmt.rsetId); tmqMgmt.rsetId = -1; } @@ -2617,8 +2635,13 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { int32_t tmq_consumer_close(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; + int32_t code = 0; + taosWLockLatch(&tmq->lock); + if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){ + goto end; + } tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); - int32_t code = tmq_unsubscribe(tmq); + code = tmq_unsubscribe(tmq); if (code == 0) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId); @@ -2626,6 +2649,9 @@ int32_t tmq_consumer_close(tmq_t* tmq) { tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code); } } + + end: + taosWUnLockLatch(&tmq->lock); return code; }