From 1a6a0d31070d1143caccbe78a8c6eb279f9bb975 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 29 May 2024 16:04:37 +0800 Subject: [PATCH 1/3] fix:[TD-30323]tmq close error if consumer is cleared --- source/client/src/clientTmq.c | 5 ++++- source/dnode/mnode/impl/src/mndConsumer.c | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3141e21f26..0446af3226 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1260,7 +1260,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t retryCnt = 0; while ((code = syncAskEp(tmq)) != 0) { if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { - tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId); + tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", tmq->consumerId, strerror(code)); + if(code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { + code = 0; + } goto FAIL; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index c7ae941389..9278fbce11 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -662,6 +662,10 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumerNew = NULL; STrans *pTrans = NULL; + if(mndAcquireConsumer(pMnode, subscribe->consumerId) == NULL && taosArrayGetSize(subscribe->topicNames) == 0){ + goto _over; + } + code = checkAndSortTopic(pMnode, subscribe.topicNames); if(code != TSDB_CODE_SUCCESS){ goto _over; From ad075ef1dec28e22a640687b46c610f0c113c851 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 1 Jun 2024 16:19:17 +0800 Subject: [PATCH 2/3] fix:[TD-30323]tmq close error if consumer is cleared --- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9278fbce11..72a8c32056 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -662,7 +662,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumerNew = NULL; STrans *pTrans = NULL; - if(mndAcquireConsumer(pMnode, subscribe->consumerId) == NULL && taosArrayGetSize(subscribe->topicNames) == 0){ + if(mndAcquireConsumer(pMnode, subscribe.consumerId) == NULL && taosArrayGetSize(subscribe.topicNames) == 0){ goto _over; } From a096e2c071b9defe1e39e273f1e6b9203208a37b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 2 Jun 2024 21:28:53 +0800 Subject: [PATCH 3/3] fix:memory leak --- source/dnode/mnode/impl/src/mndConsumer.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 72a8c32056..3eef2afcc1 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -662,8 +662,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumerNew = NULL; STrans *pTrans = NULL; - if(mndAcquireConsumer(pMnode, subscribe.consumerId) == NULL && taosArrayGetSize(subscribe.topicNames) == 0){ - goto _over; + if(taosArrayGetSize(subscribe.topicNames) == 0){ + SMqConsumerObj *pConsumerTmp = mndAcquireConsumer(pMnode, subscribe.consumerId); + if(pConsumerTmp == NULL){ + goto _over; + } + mndReleaseConsumer(pMnode, pConsumerTmp); } code = checkAndSortTopic(pMnode, subscribe.topicNames);