From 64d8d63f3dece0839a2093f962b4c3e1b29128df Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 29 Jun 2023 10:47:55 +0800 Subject: [PATCH 1/3] fix:error in limit for tmq group --- source/dnode/mnode/impl/inc/mndSubscribe.h | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 10 ++------- source/dnode/mnode/impl/src/mndSubscribe.c | 26 ++++++++++++++++++++++ 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index fad316ea12..ba4328b8fe 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -25,6 +25,7 @@ extern "C" { int32_t mndInitSubscribe(SMnode *pMnode); void mndCleanupSubscribe(SMnode *pMnode); +int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName); SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName); SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key); void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 4395446e81..35ddf46b98 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -645,18 +645,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t newTopicNum = taosArrayGetSize(pTopicList); for(int i = 0; i < newTopicNum; i++){ - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, (const char*)cgroup, (const char*)taosArrayGetP(pTopicList, i)); - if(pSub == NULL) continue; - taosRLockLatch(&pSub->lock); - if(taosHashGetSize(pSub->consumerHash) > MND_MAX_GROUP_PER_TOPIC){ + int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char*)taosArrayGetP(pTopicList, i)); + if(gNum > MND_MAX_GROUP_PER_TOPIC){ terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; code = terrno; - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); goto _over; } - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); } // check topic existence diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ce3a4ea048..4b99ad6249 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -952,6 +952,32 @@ SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) { return pSub; } +int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) { + int32_t num = 0; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqSubscribeObj *pSub = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); + if (pIter == NULL) break; + + + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(pSub->key, topic, cgroup, true); + if (strcmp(topic, topicName) != 0) { + sdbRelease(pSdb, pSub); + continue; + } + + num++; + sdbRelease(pSdb, pSub); + } + + return num; +} + void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pSub); From 225b422f5f6f257c9772c03fbe9aecfc72176798 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 29 Jun 2023 11:01:34 +0800 Subject: [PATCH 2/3] fix(stream): fix error in type bytes. --- source/common/src/systable.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index bb0e882fcd..b94b8a1058 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -160,9 +160,9 @@ static const SSysDbTableSchema streamSchema[] = { static const SSysDbTableSchema streamTaskSchema[] = { {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "task_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "task_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; From b264640ca17e6caf5ca8eaec40b80bbe840c6428 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 29 Jun 2023 11:15:53 +0800 Subject: [PATCH 3/3] fix:error in limit for tmq group --- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 35ddf46b98..e442561a92 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -646,7 +646,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t newTopicNum = taosArrayGetSize(pTopicList); for(int i = 0; i < newTopicNum; i++){ int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char*)taosArrayGetP(pTopicList, i)); - if(gNum > MND_MAX_GROUP_PER_TOPIC){ + if(gNum >= MND_MAX_GROUP_PER_TOPIC){ terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; code = terrno; goto _over;