From ac460915d6e91d611b62088645f0ccf4660f99cc Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 10 May 2022 23:47:44 +0800 Subject: [PATCH] fix(tmq): show --- source/client/src/tmq.c | 11 +++++++++-- source/common/src/systable.c | 4 ++-- source/dnode/mnode/impl/src/mndConsumer.c | 17 ++++++++++++----- source/dnode/mnode/impl/src/mndSubscribe.c | 21 ++++++++++++++------- 4 files changed, 37 insertions(+), 16 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index c768e001c5..0c73e000a8 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1307,8 +1307,15 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { } tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { - // TODO - return TMQ_RESP_ERR__SUCCESS; + tmq_list_t* lst = tmq_list_new(); + tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); + tmq_list_destroy(lst); + if (rsp == TMQ_RESP_ERR__SUCCESS) { + // TODO: free resources + return TMQ_RESP_ERR__SUCCESS; + } else { + return TMQ_RESP_ERR__FAIL; + } } const char* tmq_err2str(tmq_resp_err_t err) { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5ff0282c87..81682bb734 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -262,7 +262,7 @@ static const SSysDbTableSchema topicSchema[] = { static const SSysDbTableSchema consumerSchema[] = { {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, - {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, @@ -275,7 +275,7 @@ static const SSysDbTableSchema consumerSchema[] = { static const SSysDbTableSchema subscriptionSchema[] = { {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, }; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6c77c379e0..155ea6ae93 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -684,6 +684,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { pOldConsumer->status = MQ_CONSUMER_STATUS__READY; + // TODO: remove + /*if (taosArrayGetSize(pOldConsumer->assignedTopics) == 0) {*/ + /*}*/ } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB || pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) { pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; @@ -789,6 +792,10 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); if (pShow->pIter == NULL) break; + if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { + sdbRelease(pSdb, pConsumer); + continue; + } taosRLockLatch(&pConsumer->lock); @@ -810,12 +817,12 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false); - // group id - char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN); - varDataSetLen(groupId, strlen(varDataVal(groupId))); + // consumer group + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(cgroup), pConsumer->cgroup, TSDB_CGROUP_LEN); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)groupId, false); + colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); // app id char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c947a1913e..2a81f28edd 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -171,14 +171,21 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM return 0; } -static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) { +static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) { int32_t i = 0; while (key[i] != TMQ_SEPARATOR) { i++; } memcpy(cgroup, key, i); cgroup[i] = 0; - strcpy(topic, &key[i + 1]); + if (fullName) { + strcpy(topic, &key[i + 1]); + } else { + while (key[i] != '.') { + i++; + } + strcpy(topic, &key[i + 1]); + } return 0; } @@ -426,7 +433,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO pConsumerNew->updateType = CONSUMER_UPDATE__ADD; char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup); + mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); taosArrayPush(pConsumerNew->rebNewTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { @@ -444,7 +451,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE; char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup); + mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); taosArrayPush(pConsumerNew->rebRemovedTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { @@ -494,7 +501,7 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { // split sub key and extract topic char topic[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pRebInfo->key, topic, cgroup); + mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); ASSERT(pTopic); taosRLockLatch(&pTopic->lock); @@ -747,7 +754,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock // topic and cgroup char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - mndSplitSubscribeKey(pSub->key, topic, cgroup); + mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false); varDataSetLen(topic, strlen(varDataVal(topic))); varDataSetLen(cgroup, strlen(varDataVal(cgroup))); @@ -790,7 +797,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock // topic and cgroup char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - mndSplitSubscribeKey(pSub->key, topic, cgroup); + mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false); varDataSetLen(topic, strlen(varDataVal(topic))); varDataSetLen(cgroup, strlen(varDataVal(cgroup)));