From ddcf7c74f76ac04331db58ba2619ca5451d5bd23 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 21 Jun 2024 14:11:33 +0800 Subject: [PATCH] fix:[TS-5067] check if consumer belong to this cgroup:topic where drop cgroup --- source/dnode/mnode/impl/src/mndDef.c | 4 +- source/dnode/mnode/impl/src/mndSubscribe.c | 27 +++++++++- tests/system-test/7-tmq/tmq_taosx.py | 61 ++++++++++++++++++++++ 3 files changed, 88 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 091edc6ab0..5164557184 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -537,7 +537,7 @@ SMqSubscribeObj *tNewSubscribeObj(const char *key) { memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN); taosInitRWLatch(&pSubObj->lock); pSubObj->vgNum = 0; - pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); // TODO set hash free fp /*taosHashSetFreeFp(pSubObj->consumerHash, tDeleteSMqConsumerEp);*/ @@ -557,7 +557,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { pSubNew->withMeta = pSub->withMeta; pSubNew->vgNum = pSub->vgNum; - pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); // TODO set hash free fp /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/ void *pIter = NULL; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ffb723756c..ba9a4607cb 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -799,6 +799,28 @@ static int32_t initRebOutput(SMqRebOutputObj *rebOutput) { return 0; } +// This function only works when there are dirty consumers +static void checkConsumer(SMnode *pMnode, SMqSubscribeObj* pSub){ + void *pIter = NULL; + while (1) { + pIter = taosHashIterate(pSub->consumerHash, pIter); + if (pIter == NULL) { + break; + } + + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pConsumerEp->consumerId); + if (pConsumer != NULL) { + continue; + } + mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId); + taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs); + + taosArrayDestroy(pConsumerEp->vgs); + taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)); + } +} + static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput){ const char *key = rebInput->pRebInfo->key; SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, key); @@ -834,8 +856,9 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu mInfo("[rebalance] sub topic:%s has no consumers sub yet", key); } else { taosRLockLatch(&pSub->lock); - rebInput->oldConsumerNum = taosHashGetSize(pSub->consumerHash); - rebOutput->pSub = tCloneSubscribeObj(pSub); + rebOutput.pSub = tCloneSubscribeObj(pSub); + checkConsumer(pMnode, rebOutput.pSub); + rebInput.oldConsumerNum = taosHashGetSize(rebOutput.pSub->consumerHash); taosRUnLockLatch(&pSub->lock); mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum); diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 39b9c6d34a..200708e4ec 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -496,7 +496,68 @@ class TDTestCase: consumer.close() print("consume_ts_4551 ok") + def consume_TS_5067_Test(self): + tdSql.execute(f'create database if not exists d1 vgroups 1') + tdSql.execute(f'use d1') + tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)') + tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t1 using st tags(1) values(now+5s, 11) (now+10s, 12)') + + tdSql.query("select * from st") + tdSql.checkRows(8) + + tdSql.execute(f'create topic t1 as select * from st') + tdSql.execute(f'create topic t2 as select * from st') + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["t1"]) + except TmqError: + tdLog.exit(f"subscribe error") + + index = 0 + try: + while True: + res = consumer.poll(1) + if not res: + if index != 1: + tdLog.exit("consume error") + break + val = res.value() + if val is None: + continue + cnt = 0; + for block in val: + cnt += len(block.fetchall()) + + if cnt != 8: + tdLog.exit("consume error") + + index += 1 + finally: + consumer.close() + + consumer1 = Consumer(consumer_dict) + try: + consumer1.subscribe(["t2"]) + except TmqError: + tdLog.exit(f"subscribe error") + + tdSql.execute(f'drop consumer group g1 on t1') + tdSql.query(f'show consumers') + tdSql.checkRows(1) + consumer1.close() + def run(self): + self.consume_TS_5067_Test() self.consumeTest() self.consume_ts_4544() self.consume_ts_4551()