test: add tmq and stream in compatibility
This commit is contained in:
parent
3986f392b5
commit
91ebee80b0
|
@ -185,8 +185,8 @@ class TDTestCase:
|
|||
# baseVersion = "3.0.1.8"
|
||||
|
||||
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}")
|
||||
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
|
||||
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
|
||||
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ")
|
||||
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database test '")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/com_alltypedata.json -y")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database curdb '")
|
||||
|
@ -196,49 +196,81 @@ class TDTestCase:
|
|||
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select min(ui) from curdb.meters '")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select max(bi) from curdb.meters '")
|
||||
|
||||
# os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
|
||||
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
|
||||
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
|
||||
os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream trigger at_once into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
|
||||
|
||||
self.alter_string_in_file("0-others/tmqBasic.json", "/etc/taos/", cPath)
|
||||
# os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/tmqBasic.json -y ")
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists tmq_test_topic as select current,voltage,phase from test.meters where voltage <= 106 and current <= 5;" ')
|
||||
# create db/stb/select topic
|
||||
|
||||
db_topic = "db_test_topic"
|
||||
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {db_topic} with meta as database test" ')
|
||||
|
||||
stable_topic = "stable_test_meters_topic"
|
||||
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {stable_topic} as stable test.meters where tbname like \\"d3\\";" ')
|
||||
|
||||
select_topic = "select_test_meters_topic"
|
||||
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {select_topic} as select current,voltage,phase from test.meters where voltage >= 170;" ')
|
||||
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ')
|
||||
os.system(f" /usr/bin/taosadapter --version " )
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.websocket.scheme": "ws",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
"enable.auto.commit": "false",
|
||||
}
|
||||
|
||||
consumer = taosws.Consumer(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
|
||||
consumer = taosws.Consumer(consumer_dict)
|
||||
try:
|
||||
consumer.subscribe(["tmq_test_topic"])
|
||||
consumer.subscribe([select_topic])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
first_consumer_rows = 0
|
||||
while True:
|
||||
message = consumer.poll(timeout=1.0)
|
||||
if message:
|
||||
print("message")
|
||||
id = message.vgroup()
|
||||
topic = message.topic()
|
||||
database = message.database()
|
||||
|
||||
for block in message:
|
||||
nrows = block.nrows()
|
||||
ncols = block.ncols()
|
||||
for row in block:
|
||||
print(row)
|
||||
values = block.fetchall()
|
||||
print(nrows, ncols)
|
||||
|
||||
consumer.commit(message)
|
||||
first_consumer_rows += block.nrows()
|
||||
else:
|
||||
print("break")
|
||||
tdLog.notice("message is null and break")
|
||||
break
|
||||
consumer.commit(message)
|
||||
tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version")
|
||||
break
|
||||
|
||||
consumer.close()
|
||||
# consumer_dict2 = {
|
||||
# "group.id": "g2",
|
||||
# "td.connect.websocket.scheme": "ws",
|
||||
# "td.connect.user": "root",
|
||||
# "td.connect.pass": "taosdata",
|
||||
# "auto.offset.reset": "earliest",
|
||||
# "enable.auto.commit": "false",
|
||||
# }
|
||||
# consumer = taosws.Consumer(consumer_dict2)
|
||||
# try:
|
||||
# consumer.subscribe([db_topic,stable_topic])
|
||||
# except TmqError:
|
||||
# tdLog.exit(f"subscribe error")
|
||||
# first_consumer_rows = 0
|
||||
# while True:
|
||||
# message = consumer.poll(timeout=1.0)
|
||||
# if message:
|
||||
# for block in message:
|
||||
# first_consumer_rows += block.nrows()
|
||||
# else:
|
||||
# tdLog.notice("message is null and break")
|
||||
# break
|
||||
# consumer.commit(message)
|
||||
# tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version")
|
||||
# break
|
||||
|
||||
|
||||
|
||||
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
|
||||
|
@ -279,11 +311,10 @@ class TDTestCase:
|
|||
tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}")
|
||||
tdsql.query(f"select count(*) from {stb}")
|
||||
tdsql.checkData(0,0,tableNumbers*recordNumbers1)
|
||||
# tdsql.query("show streams;")
|
||||
# os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
|
||||
# tdsql.query("show streams;")
|
||||
# tdsql.query(f"select count(*) from {stb}")
|
||||
# tdsql.checkData(0,0,tableNumbers*recordNumbers2)
|
||||
tdsql.query("show streams;")
|
||||
tdsql.checkRows(2)
|
||||
|
||||
|
||||
|
||||
# checkout db4096
|
||||
tdsql.query("select count(*) from db4096.stb0")
|
||||
|
@ -334,7 +365,7 @@ class TDTestCase:
|
|||
|
||||
# check stream
|
||||
tdsql.query("show streams;")
|
||||
tdsql.checkRows(0)
|
||||
tdsql.checkRows(2)
|
||||
|
||||
#check TS-3131
|
||||
tdsql.query("select *,tbname from d0.almlog where mcid='m0103';")
|
||||
|
@ -348,39 +379,48 @@ class TDTestCase:
|
|||
print("The unordered list is the same as the ordered list.")
|
||||
else:
|
||||
tdLog.exit("The unordered list is not the same as the ordered list.")
|
||||
tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);")
|
||||
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
|
||||
|
||||
|
||||
# check tmq
|
||||
tdsql.execute("insert into test.d80 values (now+1s, 11, 190, 0.21);")
|
||||
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
|
||||
conn = taos.connect()
|
||||
|
||||
consumer = Consumer(
|
||||
{
|
||||
"group.id": "tg75",
|
||||
"client.id": "124",
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"enable.auto.commit": "true",
|
||||
"experimental.snapshot.enable": "true",
|
||||
}
|
||||
)
|
||||
consumer.subscribe(["tmq_test_topic"])
|
||||
|
||||
consumer.subscribe([select_topic])
|
||||
consumer_rows = 0
|
||||
while True:
|
||||
res = consumer.poll(10)
|
||||
if not res:
|
||||
message = consumer.poll(timeout=1.0)
|
||||
tdLog.info(f" null {message}")
|
||||
if message:
|
||||
for block in message:
|
||||
consumer_rows += block.nrows()
|
||||
tdLog.info(f"consumer rows is {consumer_rows}")
|
||||
else:
|
||||
print("consumer has completed and break")
|
||||
break
|
||||
err = res.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
val = res.value()
|
||||
|
||||
for block in val:
|
||||
print(block.fetchall())
|
||||
consumer.close()
|
||||
tdsql.query("select current,voltage,phase from test.meters where voltage >= 170;")
|
||||
all_rows = tdsql.queryRows
|
||||
if consumer_rows < all_rows - first_consumer_rows :
|
||||
tdLog.exit(f"consumer rows is {consumer_rows}, less than {all_rows - first_consumer_rows}")
|
||||
tdsql.query("show topics;")
|
||||
tdsql.checkRows(1)
|
||||
tdsql.checkRows(3)
|
||||
tdsql.execute(f"drop topic {select_topic};",queryTimes=10)
|
||||
tdsql.execute(f"drop topic {db_topic};",queryTimes=10)
|
||||
tdsql.execute(f"drop topic {stable_topic};",queryTimes=10)
|
||||
|
||||
os.system(f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
|
||||
tdsql.query(f"select count(*) from {stb}")
|
||||
tdsql.checkData(0,0,tableNumbers*recordNumbers2)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
Loading…
Reference in New Issue