From 37fd3d4fa8a545905e43ce998cca25a61a240b0f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 23 Jul 2024 16:29:10 +0800 Subject: [PATCH] fix:[TD-31017]process return value in client for tmq --- source/client/src/clientTmq.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 65e0d5fe91..7fb39a424b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -697,7 +697,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us for (int32_t i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - if (pTopic != NULL) { + if (pTopic == NULL) { code = TSDB_CODE_TMQ_INVALID_TOPIC; taosRUnLockLatch(&tmq->lock); goto end; @@ -707,7 +707,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us numOfVgroups); for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (pVg != NULL) { + if (pVg == NULL) { code = TSDB_CODE_INVALID_PARA; taosRUnLockLatch(&tmq->lock); goto end; @@ -2446,6 +2446,8 @@ static int32_t innerClose(tmq_t* tmq){ } int32_t tmq_unsubscribe(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; + + tscInfo("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, tmq->status); int32_t code = 0; if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__CLOSED) { code = innerClose(tmq); @@ -2466,10 +2468,12 @@ int32_t tmq_consumer_close(tmq_t* tmq) { code = innerClose(tmq); if(code == 0){ atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); - (void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId); } } + if (code == 0){ + (void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId); + } return code; }