test: add stream and topic in compatibility case

This commit is contained in:
chenhaoran 2023-03-07 11:53:45 +08:00
parent 325b3439df
commit e1be1c8d0f
3 changed files with 154 additions and 22 deletions

View File

@ -41,7 +41,7 @@
"interlace_rows": 0, "interlace_rows": 0,
"line_protocol": null, "line_protocol": null,
"tcp_transfer": "no", "tcp_transfer": "no",
"insert_rows": 10000, "insert_rows": 1000,
"childtable_limit": 0, "childtable_limit": 0,
"childtable_offset": 0, "childtable_offset": 0,
"rows_per_tbl": 0, "rows_per_tbl": 0,

View File

@ -3,6 +3,8 @@ import taos
import sys import sys
import os import os
import time import time
import inspect
from taos.tmq import Consumer
from pathlib import Path from pathlib import Path
from util.log import * from util.log import *
@ -14,7 +16,7 @@ from util.dnodes import TDDnode
from util.cluster import * from util.cluster import *
import subprocess import subprocess
BASEVERSION = "3.0.1.8" BASEVERSION = "3.0.2.5"
class TDTestCase: class TDTestCase:
def caseDescription(self): def caseDescription(self):
''' '''
@ -99,6 +101,7 @@ class TDTestCase:
def run(self): def run(self):
scriptsPath = os.path.dirname(os.path.realpath(__file__))
distro_id = distro.id() distro_id = distro.id()
if distro_id == "alpine": if distro_id == "alpine":
tdLog.info(f"alpine skip compatibility test") tdLog.info(f"alpine skip compatibility test")
@ -128,18 +131,17 @@ class TDTestCase:
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}") tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
sleep(3) os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
os.system(f"cd {scriptsPath} && python3 testRoll.py")
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ')
# tdsqlF.query(f"select count(*) from {stb}") # print(f"start taosd: nohup taosd -c {cPath} & ")
# tdsqlF.checkData(0,0,tableNumbers*recordNumbers1) # os.system(f" nohup taosd -c {cPath} & " )
os.system("pkill taosd")
self.checkProcessPid("taosd")
print(f"start taosd: nohup taosd -c {cPath} & ")
os.system(f" nohup taosd -c {cPath} & " )
sleep(10)
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ") tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y") os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
os.system("pkill taosd") # make sure all the data are saved in disk. os.system("pkill taosd") # make sure all the data are saved in disk.
self.checkProcessPid("taosd") self.checkProcessPid("taosd")
@ -162,11 +164,13 @@ class TDTestCase:
tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}") tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}")
tdsql.query(f"select count(*) from {stb}") tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers1) tdsql.checkData(0,0,tableNumbers*recordNumbers1)
os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ") # tdsql.query("show streams;")
tdsql.query(f"select count(*) from {stb}") # os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
tdsql.checkData(0,0,tableNumbers*recordNumbers2) # tdsql.query("show streams;")
# tdsql.query(f"select count(*) from {stb}")
# tdsql.checkData(0,0,tableNumbers*recordNumbers2)
tdsql.query(f"select count(*) from db4096.stb0") tdsql.query(f"select count(*) from db4096.stb0")
tdsql.checkData(0,0,50000) tdsql.checkData(0,0,5000)
tdsql=tdCom.newTdSql() tdsql=tdCom.newTdSql()
tdLog.printNoPrefix(f"==========step4:verify backticks in taos Sql-TD18542") tdLog.printNoPrefix(f"==========step4:verify backticks in taos Sql-TD18542")
@ -183,13 +187,58 @@ class TDTestCase:
tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);") tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);")
tdsql.query("select * from db.ct4") tdsql.query("select * from db.ct4")
tdsql.checkData(0,1,14) tdsql.checkData(0,1,14)
print(1)
tdsql=tdCom.newTdSql()
tdsql.query("describe information_schema.ins_databases;") tdsql.query("describe information_schema.ins_databases;")
qRows=tdsql.queryRows qRows=tdsql.queryRows
comFlag=True
j=0
while comFlag:
for i in range(qRows) : for i in range(qRows) :
if tdsql.queryResult[i][0]=="retentions" : if tdsql.queryResult[i][0] == "retentions" :
return True print("parameters include retentions")
else: comFlag=False
return False break
else :
comFlag=True
j=j+1
if j == qRows:
print("parameters don't include retentions")
caller = inspect.getframeinfo(inspect.stack()[0][0])
args = (caller.filename, caller.lineno)
tdLog.exit("%s(%d) failed" % args)
tdsql.query("show streams;")
tdsql.checkRows(2)
tdsql.execute("insert into tmq_test.tb1 values (now, 11, 3.0, 'tmq test1');")
tdsql.execute("insert into tmq_test.tb2 values (now, 22, 3.0, 'tmq test2');")
conn = taos.connect()
consumer = Consumer(
{
"group.id": "tg75",
"client.id": "124",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
"experimental.snapshot.enable": "true",
}
)
consumer.subscribe(["tmq_test_topic"])
while True:
res = consumer.poll(10)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
tdsql.query("show topics;")
tdsql.checkRows(1)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")

View File

@ -0,0 +1,83 @@
from taos.tmq import Consumer
import taos
import taosrest
import socket
def init_tmq_env(db, topic):
conn = taos.connect()
# conn.execute("create dnode test209")
# conn.execute("create dnode test216")
# conn.execute("create mnode on dnode 2")
# conn.execute("create mnode on dnode 3")
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
conn.execute("create database if not exists {} replica 1 ".format(db))
conn.select_db(db)
conn.execute(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
conn.execute("insert into tb1 values (now+10s, 1, 1.0, 'tmq test')")
conn.execute("insert into tb2 values (now+100s, 2, 2.0, 'tmq test')")
conn.execute("insert into tb3 values (now+20s, 3, 3.0, 'tmq test')")
conn.execute("insert into tb3 values (now+30s, 4, 4.0, 'tmq test4')")
def init_tmq_rest_env(db, topic):
host = socket.gethostname()
conn = taosrest.connect(url=f"http://{host}:6041")
# conn.execute("create dnode test209")
# conn.execute("create dnode test216")
# conn.execute("create mnode on dnode 2")
# conn.execute("create mnode on dnode 3")
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
conn.execute("create database if not exists {} replica 3 ".format(db))
conn.select_db(db)
conn.execute(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
conn.execute("insert into tb1 values (now+10s, 1, 1.0, 'tmq test')")
conn.execute("insert into tb2 values (now+100s, 2, 2.0, 'tmq test')")
conn.execute("insert into tb3 values (now+20s, 3, 3.0, 'tmq test')")
conn.execute("insert into tb3 values (now+30s, 4, 4.0, 'tmq test4')")
if __name__ == '__main__':
conn = taos.connect()
init_tmq_env("tmq_test", "tmq_test_topic") # init env
# init_tmq_rest_env("tmq_test", "tmq_test_topic")
consumer = Consumer(
{
"group.id": "tg75",
"client.id": "124",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
"experimental.snapshot.enable": "true",
}
)
consumer.subscribe(["tmq_test_topic"])
while True:
res = consumer.poll(10)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())