From 15dd2721541109156c7e2a9168da9e771cf34d06 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 11 May 2022 02:58:17 +0800 Subject: [PATCH] enh(tmq): do not show closed consumer --- include/common/tmsg.h | 2 +- source/client/src/tmq.c | 20 ++++++++++++-------- source/dnode/mnode/impl/inc/mndConsumer.h | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 11 ++++++++--- tests/test/c/tmqSim.c | 13 +++++++------ 5 files changed, 29 insertions(+), 18 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 909b7d0877..ff2e419c75 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1493,7 +1493,7 @@ typedef struct { } SMVSubscribeRsp; typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; + char name[TSDB_TOPIC_FNAME_LEN]; int8_t igNotExists; } SMDropTopicReq; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 0c73e000a8..0ce689f19c 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1307,15 +1307,19 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { } tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { - tmq_list_t* lst = tmq_list_new(); - tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); - tmq_list_destroy(lst); - if (rsp == TMQ_RESP_ERR__SUCCESS) { - // TODO: free resources - return TMQ_RESP_ERR__SUCCESS; - } else { - return TMQ_RESP_ERR__FAIL; + if (tmq->status == TMQ_CONSUMER_STATUS__READY) { + tmq_list_t* lst = tmq_list_new(); + tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); + tmq_list_destroy(lst); + if (rsp == TMQ_RESP_ERR__SUCCESS) { + // TODO: free resources + return TMQ_RESP_ERR__SUCCESS; + } else { + return TMQ_RESP_ERR__FAIL; + } } + // TODO: free resources + return TMQ_RESP_ERR__SUCCESS; } const char* tmq_err2str(tmq_resp_err_t err) { diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index a8bfe91cbf..210e336ac2 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -29,6 +29,7 @@ enum { MQ_CONSUMER_STATUS__LOST, MQ_CONSUMER_STATUS__LOST_IN_REB, MQ_CONSUMER_STATUS__LOST_REBD, + MQ_CONSUMER_STATUS__REMOVED, }; int32_t mndInitConsumer(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 155ea6ae93..9c8c6d32eb 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -486,6 +486,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { } } + if (pConsumerOld && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && + taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { + /*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/ + /*pConsumerNew->updateType = */ + /*}*/ + goto SUBSCRIBE_OVER; + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg); if (pTrans == NULL) goto SUBSCRIBE_OVER; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER; @@ -684,9 +692,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { pOldConsumer->status = MQ_CONSUMER_STATUS__READY; - // TODO: remove - /*if (taosArrayGetSize(pOldConsumer->assignedTopics) == 0) {*/ - /*}*/ } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB || pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) { pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 73cdf7f59c..4a59d18d87 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -331,12 +331,6 @@ void loop_consume(SThreadInfo* pInfo) { } } - err = tmq_consumer_close(pInfo->tmq); - if (err) { - printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); - } - pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeRowCnt = totalRows; @@ -372,6 +366,13 @@ void* consumeThreadFunc(void* param) { return NULL; } + err = tmq_consumer_close(pInfo->tmq); + if (err) { + printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + pInfo->tmq = NULL; + // save consume result into consumeresult table saveConsumeResult(pInfo);