From 113f7124b561f960bb2c2da00cab23e345288ce5 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 13 Jan 2025 18:24:21 +0800 Subject: [PATCH] fix:[TD-33504]add test case --- tests/parallel_test/cases.task | 1 + tests/system-test/7-tmq/tmq_td33504.py | 84 ++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 tests/system-test/7-tmq/tmq_td33504.py diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 3c93c729d4..21005d910b 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -329,6 +329,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td33504.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py diff --git a/tests/system-test/7-tmq/tmq_td33504.py b/tests/system-test/7-tmq/tmq_td33504.py new file mode 100644 index 0000000000..085b245dd5 --- /dev/null +++ b/tests/system-test/7-tmq/tmq_td33504.py @@ -0,0 +1,84 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +from taos import * + +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def test(self): + tdSql.execute(f'create database if not exists db') + tdSql.execute(f'use db') + tdSql.execute(f'CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)') + tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)") + tdSql.execute("INSERT INTO d1002 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)") + tdSql.execute("INSERT INTO d1003 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)") + tdSql.execute("INSERT INTO d1004 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)") + + tdSql.execute(f'create topic t0 as select * from meters') + tdSql.execute(f'create topic t1 as select * from meters') + + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["t0"]) + except TmqError: + tdLog.exit(f"subscribe error") + + try: + res = consumer.poll(1) + print(res) + + consumer.unsubscribe() + + try: + consumer.subscribe(["t1"]) + except TmqError: + tdLog.exit(f"subscribe error") + + + res = consumer.poll(1) + print(res) + if res == None and taos_errno(None) != 0: + tdLog.exit(f"poll error %d" % taos_errno(None)) + + except TmqError: + tdLog.exit(f"poll error") + finally: + consumer.close() + + + def run(self): + self.test() + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())