Merge pull request #20291 from taosdata/test/main/TD-20859
test: add stream and topic in compatibility case
This commit is contained in:
commit
40d3f58af8
|
@ -3,6 +3,8 @@ import taos
|
|||
import sys
|
||||
import os
|
||||
import time
|
||||
import inspect
|
||||
from taos.tmq import Consumer
|
||||
|
||||
from pathlib import Path
|
||||
from util.log import *
|
||||
|
@ -99,6 +101,7 @@ class TDTestCase:
|
|||
|
||||
|
||||
def run(self):
|
||||
scriptsPath = os.path.dirname(os.path.realpath(__file__))
|
||||
distro_id = distro.id()
|
||||
if distro_id == "alpine":
|
||||
tdLog.info(f"alpine skip compatibility test")
|
||||
|
@ -128,19 +131,18 @@ class TDTestCase:
|
|||
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}")
|
||||
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
|
||||
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
|
||||
sleep(3)
|
||||
os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
|
||||
os.system(f"sed -i 's/\/etc\/taos/{cPath}/' 0-others/tmqBasic.json ")
|
||||
# os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/tmqBasic.json -y ")
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists tmq_test_topic as select current,voltage,phase from test.meters where voltage <= 106 and current <= 5;" ')
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ')
|
||||
|
||||
# tdsqlF.query(f"select count(*) from {stb}")
|
||||
# tdsqlF.checkData(0,0,tableNumbers*recordNumbers1)
|
||||
os.system("pkill taosd")
|
||||
self.checkProcessPid("taosd")
|
||||
|
||||
print(f"start taosd: nohup taosd -c {cPath} & ")
|
||||
os.system(f" nohup taosd -c {cPath} & " )
|
||||
sleep(10)
|
||||
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
|
||||
os.system("pkill taosd") # make sure all the data are saved in disk.
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
|
||||
os.system("pkill taosd") # make sure all the data are saved in disk.
|
||||
self.checkProcessPid("taosd")
|
||||
|
||||
|
||||
|
@ -161,10 +163,12 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}")
|
||||
tdsql.query(f"select count(*) from {stb}")
|
||||
tdsql.checkData(0,0,tableNumbers*recordNumbers1)
|
||||
os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
|
||||
tdsql.query(f"select count(*) from {stb}")
|
||||
tdsql.checkData(0,0,tableNumbers*recordNumbers2)
|
||||
tdsql.checkData(0,0,tableNumbers*recordNumbers1)
|
||||
# tdsql.query("show streams;")
|
||||
# os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
|
||||
# tdsql.query("show streams;")
|
||||
# tdsql.query(f"select count(*) from {stb}")
|
||||
# tdsql.checkData(0,0,tableNumbers*recordNumbers2)
|
||||
tdsql.query(f"select count(*) from db4096.stb0")
|
||||
tdsql.checkData(0,0,50000)
|
||||
|
||||
|
@ -183,13 +187,58 @@ class TDTestCase:
|
|||
tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);")
|
||||
tdsql.query("select * from db.ct4")
|
||||
tdsql.checkData(0,1,14)
|
||||
print(1)
|
||||
tdsql=tdCom.newTdSql()
|
||||
tdsql.query("describe information_schema.ins_databases;")
|
||||
qRows=tdsql.queryRows
|
||||
for i in range(qRows) :
|
||||
if tdsql.queryResult[i][0]=="retentions" :
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
comFlag=True
|
||||
j=0
|
||||
while comFlag:
|
||||
for i in range(qRows) :
|
||||
if tdsql.queryResult[i][0] == "retentions" :
|
||||
print("parameters include retentions")
|
||||
comFlag=False
|
||||
break
|
||||
else :
|
||||
comFlag=True
|
||||
j=j+1
|
||||
if j == qRows:
|
||||
print("parameters don't include retentions")
|
||||
caller = inspect.getframeinfo(inspect.stack()[0][0])
|
||||
args = (caller.filename, caller.lineno)
|
||||
tdLog.exit("%s(%d) failed" % args)
|
||||
tdsql.query("show streams;")
|
||||
tdsql.checkRows(2)
|
||||
tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);")
|
||||
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
|
||||
|
||||
conn = taos.connect()
|
||||
|
||||
consumer = Consumer(
|
||||
{
|
||||
"group.id": "tg75",
|
||||
"client.id": "124",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"enable.auto.commit": "true",
|
||||
"experimental.snapshot.enable": "true",
|
||||
}
|
||||
)
|
||||
consumer.subscribe(["tmq_test_topic"])
|
||||
|
||||
while True:
|
||||
res = consumer.poll(10)
|
||||
if not res:
|
||||
break
|
||||
err = res.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
val = res.value()
|
||||
|
||||
for block in val:
|
||||
print(block.fetchall())
|
||||
tdsql.query("show topics;")
|
||||
tdsql.checkRows(1)
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
from taos.tmq import Consumer
|
||||
import taos
|
||||
import taosrest
|
||||
import socket
|
||||
|
||||
|
||||
def init_tmq_env(db, topic):
|
||||
conn = taos.connect()
|
||||
# conn.execute("create dnode test209")
|
||||
# conn.execute("create dnode test216")
|
||||
# conn.execute("create mnode on dnode 2")
|
||||
# conn.execute("create mnode on dnode 3")
|
||||
|
||||
conn.execute("drop topic if exists {}".format(topic))
|
||||
conn.execute("drop database if exists {}".format(db))
|
||||
conn.execute("create database if not exists {} replica 1 ".format(db))
|
||||
conn.select_db(db)
|
||||
conn.execute(
|
||||
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
|
||||
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
|
||||
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
|
||||
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
|
||||
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
|
||||
conn.execute("insert into tb1 values (now+10s, 1, 1.0, 'tmq test')")
|
||||
conn.execute("insert into tb2 values (now+100s, 2, 2.0, 'tmq test')")
|
||||
conn.execute("insert into tb3 values (now+20s, 3, 3.0, 'tmq test')")
|
||||
conn.execute("insert into tb3 values (now+30s, 4, 4.0, 'tmq test4')")
|
||||
|
||||
def init_tmq_rest_env(db, topic):
|
||||
host = socket.gethostname()
|
||||
conn = taosrest.connect(url=f"http://{host}:6041")
|
||||
|
||||
# conn.execute("create dnode test209")
|
||||
# conn.execute("create dnode test216")
|
||||
# conn.execute("create mnode on dnode 2")
|
||||
# conn.execute("create mnode on dnode 3")
|
||||
|
||||
conn.execute("drop topic if exists {}".format(topic))
|
||||
conn.execute("drop database if exists {}".format(db))
|
||||
conn.execute("create database if not exists {} replica 3 ".format(db))
|
||||
conn.select_db(db)
|
||||
conn.execute(
|
||||
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
|
||||
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
|
||||
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
|
||||
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
|
||||
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
|
||||
conn.execute("insert into tb1 values (now+10s, 1, 1.0, 'tmq test')")
|
||||
conn.execute("insert into tb2 values (now+100s, 2, 2.0, 'tmq test')")
|
||||
conn.execute("insert into tb3 values (now+20s, 3, 3.0, 'tmq test')")
|
||||
conn.execute("insert into tb3 values (now+30s, 4, 4.0, 'tmq test4')")
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
conn = taos.connect()
|
||||
|
||||
init_tmq_env("tmq_test", "tmq_test_topic") # init env
|
||||
# init_tmq_rest_env("tmq_test", "tmq_test_topic")
|
||||
consumer = Consumer(
|
||||
{
|
||||
"group.id": "tg75",
|
||||
"client.id": "124",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"enable.auto.commit": "true",
|
||||
"experimental.snapshot.enable": "true",
|
||||
}
|
||||
)
|
||||
consumer.subscribe(["tmq_test_topic"])
|
||||
|
||||
while True:
|
||||
res = consumer.poll(10)
|
||||
if not res:
|
||||
break
|
||||
err = res.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
val = res.value()
|
||||
|
||||
for block in val:
|
||||
print(block.fetchall())
|
|
@ -0,0 +1,24 @@
|
|||
{
|
||||
"filetype": "subscribe",
|
||||
"cfgdir": "/etc/taos",
|
||||
"host": "127.0.0.1",
|
||||
"port": 6030,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"confirm_parameter_prompt": "no",
|
||||
"tmq_info": {
|
||||
"concurrent": 1,
|
||||
"poll_delay": 10000,
|
||||
"group.id": "grpId_0",
|
||||
"client.id": "clientId",
|
||||
"auto.offset.reset": "earliest",
|
||||
"enable.auto.commit": "true",
|
||||
"auto.commit.interval.ms": 1000,
|
||||
"enable.heartbeat.background": "true",
|
||||
"experimental.snapshot.enable": "true",
|
||||
"msg.with.table.name": "false",
|
||||
"topic_list": [
|
||||
{"name": "tmq_topic_1", "sql": "select current,voltage,phase from test.meters where voltage <= 106 ;"}
|
||||
]
|
||||
}
|
||||
}
|
|
@ -86,7 +86,7 @@ class TDTestCase:
|
|||
self.ctbNums=ctbNums
|
||||
rowNUms=5000
|
||||
ts=1451606400000
|
||||
tdSql.execute(f"create database {dbname};")
|
||||
tdSql.execute(f"create database {dbname} cachemodel 'both';")
|
||||
tdSql.execute(f"use {dbname} ")
|
||||
tdSql.execute(f'''
|
||||
create table {stabname1} (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30),load_capacity double,fuel_capacity double,nominal_fuel_consumption double);
|
||||
|
|
Loading…
Reference in New Issue