From bd83023b09335d441e5e543b6b9f666dce173ec3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 4 Sep 2024 18:26:24 +0800 Subject: [PATCH] feat:[TD-30270] checkout subscribe logic if unsubscribe twice --- include/util/taoserror.h | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 8 ++- source/util/src/terror.c | 1 + tests/parallel_test/cases.task | 1 + tests/system-test/7-tmq/td-30270.py | 80 +++++++++++++++++++++++ 5 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 tests/system-test/7-tmq/td-30270.py diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4591c7fbcc..16027730f7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -962,6 +962,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013) #define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014) #define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015) +#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index c5a440b549..7639349bac 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -511,6 +511,11 @@ static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerOb } } } + // no topics need to be rebalanced + if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { + code = TSDB_CODE_TMQ_NO_NEED_REBALANCE; + } + END: return code; } @@ -603,7 +608,8 @@ END: mndTransDrop(pTrans); tDeleteSMqConsumerObj(pConsumerNew); taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); - return code; + + return code == TSDB_CODE_TMQ_NO_NEED_REBALANCE ? 0 : code; } SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 58dde5cd23..f85c76f157 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -806,6 +806,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 43193128fa..00b17ec73f 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -229,6 +229,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/basic5.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ts-4674.py -N 3 -n 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-30270.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb3.py diff --git a/tests/system-test/7-tmq/td-30270.py b/tests/system-test/7-tmq/td-30270.py new file mode 100644 index 0000000000..73c7a88f65 --- /dev/null +++ b/tests/system-test/7-tmq/td-30270.py @@ -0,0 +1,80 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + clientCfgDict = {'debugFlag': 135} + updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict} + # updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict, 'tmqRowSize':1} + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def consume_test(self): + + tdSql.execute(f'create database if not exists d1') + tdSql.execute(f'use d1') + tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)') + tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)') + + + tdSql.execute(f'create topic topic_all as select * from st') + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + } + consumer = Consumer(consumer_dict) + + try: + consumer.unsubscribe() + consumer.unsubscribe() + consumer.subscribe(["topic_all"]) + consumer.subscribe(["topic_all"]) + except TmqError: + tdLog.exit(f"subscribe error") + + cnt = 0 + try: + while True: + res = consumer.poll(2) + if not res: + break + val = res.value() + if val is None: + print(f"null val") + continue + for block in val: + cnt += len(block.fetchall()) + + print(f"block {cnt} rows") + + finally: + consumer.unsubscribe(); + consumer.close() + def run(self): + self.consume_test() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())