diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 057ff0f7c1..c02ff465c6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -759,8 +759,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || - pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && + if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 7a20898911..93fd847d10 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -227,6 +227,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/basic5.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb.py -N 3 -n 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ts-4674.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb3.py diff --git a/tests/system-test/7-tmq/ts-4674.py b/tests/system-test/7-tmq/ts-4674.py new file mode 100644 index 0000000000..709debaef1 --- /dev/null +++ b/tests/system-test/7-tmq/ts-4674.py @@ -0,0 +1,157 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + clientCfgDict = {'debugFlag': 135} + updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict} + # updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict, 'tmqRowSize':1} + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + # def consume_TS_4674_Test(self): + # + # os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000000 -i 1000 -v 1 -a 3 > /dev/null 2>&1 &") + # time.sleep() + # tdSql.execute(f'create topic topic_all with meta as database test') + # consumer_dict = { + # "group.id": "g1", + # "td.connect.user": "root", + # "td.connect.pass": "taosdata", + # "auto.offset.reset": "earliest", + # } + # consumer = Consumer(consumer_dict) + # + # try: + # consumer.subscribe(["topic_all"]) + # except TmqError: + # tdLog.exit(f"subscribe error") + # + # try: + # while True: + # res = consumer.poll(5) + # if not res: + # print(f"null") + # continue + # val = res.value() + # if val is None: + # print(f"null") + # continue + # cnt = 0; + # for block in val: + # cnt += len(block.fetchall()) + # + # print(f"block {cnt} rows") + # + # finally: + # consumer.close() + + def get_leader(self): + tdLog.debug("get leader") + tdSql.query("show vnodes") + for result in tdSql.queryResult: + if result[3] == 'leader': + tdLog.debug("leader is %d"%(result[0])) + return result[0] + return -1 + + def balance_vnode(self): + leader_before = self.get_leader() + + while True: + leader_after = -1 + tdSql.query("balance vgroup leader") + while True: + leader_after = self.get_leader() + if leader_after != -1 : + break; + else: + time.sleep(1) + if leader_after != leader_before: + tdLog.debug("leader changed") + break; + else : + time.sleep(1) + + + def consume_TS_4674_Test(self): + + tdSql.execute(f'create database if not exists d1 replica 3 vgroups 1') + tdSql.execute(f'use d1') + tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)') + tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)') + + + tdSql.execute(f'create topic topic_all as select * from st') + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_all"]) + except TmqError: + tdLog.exit(f"subscribe error") + + cnt = 0; + balance = False + try: + while True: + res = consumer.poll(2) + if not res: + print(f"null res") + if balance == False and cnt != 6 : + tdLog.exit(f"subscribe num != 6") + if balance == True : + if cnt != 8 : + tdLog.exit(f"subscribe num != 8") + # tdLog.debug(f"subscribe num != 8") + # continue + else : + break + self.balance_vnode() + balance = True + tdSql.execute(f'insert into t1 using st tags(1) values(now+5s, 11) (now+10s, 12)') + continue + val = res.value() + if val is None: + print(f"null val") + continue + for block in val: + cnt += len(block.fetchall()) + + print(f"block {cnt} rows") + + finally: + consumer.close() + def run(self): + self.consume_TS_4674_Test() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())