Merge pull request #25504 from taosdata/test/3.0/TD-28952

test: add tmq and stream in compatibility
This commit is contained in:
Alex Duan 2024-04-26 18:50:26 +08:00 committed by GitHub
commit 6d6dda0ae3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 85 additions and 45 deletions

View File

@ -185,8 +185,8 @@ class TDTestCase:
# baseVersion = "3.0.1.8"
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database test '")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/com_alltypedata.json -y")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database curdb '")
@ -196,49 +196,81 @@ class TDTestCase:
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select min(ui) from curdb.meters '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select max(bi) from curdb.meters '")
# os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream trigger at_once into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
self.alter_string_in_file("0-others/tmqBasic.json", "/etc/taos/", cPath)
# os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/tmqBasic.json -y ")
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists tmq_test_topic as select current,voltage,phase from test.meters where voltage <= 106 and current <= 5;" ')
# create db/stb/select topic
db_topic = "db_test_topic"
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {db_topic} with meta as database test" ')
stable_topic = "stable_test_meters_topic"
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {stable_topic} as stable test.meters where tbname like \\"d3\\";" ')
select_topic = "select_test_meters_topic"
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {select_topic} as select current,voltage,phase from test.meters where voltage >= 170;" ')
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ')
os.system(f" /usr/bin/taosadapter --version " )
consumer_dict = {
"group.id": "g1",
"td.connect.websocket.scheme": "ws",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
}
consumer = taosws.Consumer(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
consumer = taosws.Consumer(consumer_dict)
try:
consumer.subscribe(["tmq_test_topic"])
consumer.subscribe([select_topic])
except TmqError:
tdLog.exit(f"subscribe error")
first_consumer_rows = 0
while True:
message = consumer.poll(timeout=1.0)
if message:
print("message")
id = message.vgroup()
topic = message.topic()
database = message.database()
for block in message:
nrows = block.nrows()
ncols = block.ncols()
for row in block:
print(row)
values = block.fetchall()
print(nrows, ncols)
consumer.commit(message)
first_consumer_rows += block.nrows()
else:
print("break")
tdLog.notice("message is null and break")
break
consumer.commit(message)
tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version")
break
consumer.close()
# consumer_dict2 = {
# "group.id": "g2",
# "td.connect.websocket.scheme": "ws",
# "td.connect.user": "root",
# "td.connect.pass": "taosdata",
# "auto.offset.reset": "earliest",
# "enable.auto.commit": "false",
# }
# consumer = taosws.Consumer(consumer_dict2)
# try:
# consumer.subscribe([db_topic,stable_topic])
# except TmqError:
# tdLog.exit(f"subscribe error")
# first_consumer_rows = 0
# while True:
# message = consumer.poll(timeout=1.0)
# if message:
# for block in message:
# first_consumer_rows += block.nrows()
# else:
# tdLog.notice("message is null and break")
# break
# consumer.commit(message)
# tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version")
# break
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
@ -279,11 +311,10 @@ class TDTestCase:
tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}")
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers1)
# tdsql.query("show streams;")
# os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
# tdsql.query("show streams;")
# tdsql.query(f"select count(*) from {stb}")
# tdsql.checkData(0,0,tableNumbers*recordNumbers2)
tdsql.query("show streams;")
tdsql.checkRows(2)
# checkout db4096
tdsql.query("select count(*) from db4096.stb0")
@ -334,7 +365,7 @@ class TDTestCase:
# check stream
tdsql.query("show streams;")
tdsql.checkRows(0)
tdsql.checkRows(2)
#check TS-3131
tdsql.query("select *,tbname from d0.almlog where mcid='m0103';")
@ -348,39 +379,48 @@ class TDTestCase:
print("The unordered list is the same as the ordered list.")
else:
tdLog.exit("The unordered list is not the same as the ordered list.")
tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);")
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
# check tmq
tdsql.execute("insert into test.d80 values (now+1s, 11, 190, 0.21);")
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
conn = taos.connect()
consumer = Consumer(
{
"group.id": "tg75",
"client.id": "124",
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
"experimental.snapshot.enable": "true",
}
)
consumer.subscribe(["tmq_test_topic"])
consumer.subscribe([select_topic])
consumer_rows = 0
while True:
res = consumer.poll(10)
if not res:
message = consumer.poll(timeout=1.0)
tdLog.info(f" null {message}")
if message:
for block in message:
consumer_rows += block.nrows()
tdLog.info(f"consumer rows is {consumer_rows}")
else:
print("consumer has completed and break")
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
consumer.close()
tdsql.query("select current,voltage,phase from test.meters where voltage >= 170;")
all_rows = tdsql.queryRows
if consumer_rows < all_rows - first_consumer_rows :
tdLog.exit(f"consumer rows is {consumer_rows}, less than {all_rows - first_consumer_rows}")
tdsql.query("show topics;")
tdsql.checkRows(1)
tdsql.checkRows(3)
tdsql.execute(f"drop topic {select_topic};",queryTimes=10)
tdsql.execute(f"drop topic {db_topic};",queryTimes=10)
tdsql.execute(f"drop topic {stable_topic};",queryTimes=10)
os.system(f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers2)
def stop(self):
tdSql.close()