Merge pull request #24984 from taosdata/test/3.0/TD-28844-1

test: add subscribe and commits in compatibility test
This commit is contained in:
Alex Duan 2024-03-04 16:07:15 +08:00 committed by GitHub
commit f76246ef30
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 76 additions and 9 deletions

View File

@ -22,7 +22,7 @@
"vgroups": 2, "vgroups": 2,
"replica": 1, "replica": 1,
"precision": "ms", "precision": "ms",
"stt_trigger": 8, "stt_trigger": 1,
"minRows": 100, "minRows": 100,
"maxRows": 4096 "maxRows": 4096
}, },

View File

@ -1,11 +1,13 @@
from urllib.parse import uses_relative from urllib.parse import uses_relative
import taos import taos
import taosws
import sys import sys
import os import os
import time import time
import platform import platform
import inspect import inspect
from taos.tmq import Consumer from taos.tmq import Consumer
from taos.tmq import *
from pathlib import Path from pathlib import Path
from util.log import * from util.log import *
@ -17,7 +19,7 @@ from util.dnodes import TDDnode
from util.cluster import * from util.cluster import *
import subprocess import subprocess
BASEVERSION = "3.0.2.3" BASEVERSION = "3.2.0.0"
class TDTestCase: class TDTestCase:
def caseDescription(self): def caseDescription(self):
f''' f'''
@ -30,7 +32,7 @@ class TDTestCase:
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300 stt_trigger 4; ;use deldata; self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300 stt_trigger 1; ;use deldata;
create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int); create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int);
create table deldata.ct1 using deldata.stb1 tags ( 1 ); create table deldata.ct1 using deldata.stb1 tags ( 1 );
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a ); insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
@ -104,8 +106,19 @@ class TDTestCase:
print(f"{packageName} has been exists") print(f"{packageName} has been exists")
os.system(f" cd {packagePath} && tar xvf {packageName} && cd {packageTPath} && ./install.sh -e no " ) os.system(f" cd {packagePath} && tar xvf {packageName} && cd {packageTPath} && ./install.sh -e no " )
tdDnodes.stop(1) tdDnodes.stop(1)
print(f"start taosd: rm -rf {dataPath}/* && nohup taosd -c {cPath} & ") print(f"start taosd: rm -rf {dataPath}/* && nohup /usr/bin/taosd -c {cPath} & ")
os.system(f"rm -rf {dataPath}/* && nohup taosd -c {cPath} & " ) os.system(f"rm -rf {dataPath}/* && nohup /usr/bin/taosd -c {cPath} & " )
os.system(f"killall taosadapter" )
os.system(f"cp /etc/taos/taosadapter.toml {cPath}/taosadapter.toml " )
taosadapter_cfg = cPath + "/taosadapter.toml"
taosadapter_log_path = cPath + "/../log/"
print(f"taosadapter_cfg:{taosadapter_cfg},taosadapter_log_path:{taosadapter_log_path} ")
self.alter_string_in_file(taosadapter_cfg,"#path = \"/var/log/taos\"",f"path = \"{taosadapter_log_path}\"")
self.alter_string_in_file(taosadapter_cfg,"taosConfigDir = \"\"",f"taosConfigDir = \"{cPath}\"")
print("/usr/bin/taosadapter --version")
os.system(f" /usr/bin/taosadapter --version" )
print(f" LD_LIBRARY_PATH=/usr/lib -c {taosadapter_cfg} 2>&1 & ")
os.system(f" LD_LIBRARY_PATH=/usr/lib /usr/bin/taosadapter -c {taosadapter_cfg} 2>&1 & " )
sleep(5) sleep(5)
@ -117,6 +130,23 @@ class TDTestCase:
sorted_list = sorted(unordered_list) sorted_list = sorted(unordered_list)
return sorted_list == ordered_list return sorted_list == ordered_list
def alter_string_in_file(self,file,old_str,new_str):
"""
replace str in file
:param file
:param old_str
:param new_str
:return:
"""
file_data = ""
with open(file, "r", encoding="utf-8") as f:
for line in f:
if old_str in line:
line = line.replace(old_str,new_str)
file_data += line
with open(file,"w",encoding="utf-8") as f:
f.write(file_data)
def run(self): def run(self):
scriptsPath = os.path.dirname(os.path.realpath(__file__)) scriptsPath = os.path.dirname(os.path.realpath(__file__))
distro_id = distro.id() distro_id = distro.id()
@ -131,7 +161,7 @@ class TDTestCase:
dbname = "test" dbname = "test"
stb = f"{dbname}.meters" stb = f"{dbname}.meters"
self.installTaosd(bPath,cPath) self.installTaosd(bPath,cPath)
os.system("echo 'debugFlag 143' > /etc/taos/taos.cfg ") # os.system(f"echo 'debugFlag 143' >> {cPath}/taos.cfg ")
tableNumbers=100 tableNumbers=100
recordNumbers1=100 recordNumbers1=100
recordNumbers2=1000 recordNumbers2=1000
@ -163,11 +193,46 @@ class TDTestCase:
# os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ") # os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ') # os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ') # os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
os.system(f"sed -i 's/\/etc\/taos/{cPath}/' 0-others/tmqBasic.json ") self.alter_string_in_file("0-others/tmqBasic.json", "/etc/taos/", cPath)
# os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/tmqBasic.json -y ") # os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/tmqBasic.json -y ")
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists tmq_test_topic as select current,voltage,phase from test.meters where voltage <= 106 and current <= 5;" ') os.system('LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists tmq_test_topic as select current,voltage,phase from test.meters where voltage <= 106 and current <= 5;" ')
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ') os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ')
os.system(f" /usr/bin/taosadapter --version " )
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
}
consumer = taosws.Consumer(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
try:
consumer.subscribe(["tmq_test_topic"])
except TmqError:
tdLog.exit(f"subscribe error")
while True:
message = consumer.poll(timeout=1.0)
if message:
print("message")
id = message.vgroup()
topic = message.topic()
database = message.database()
for block in message:
nrows = block.nrows()
ncols = block.ncols()
for row in block:
print(row)
values = block.fetchall()
print(nrows, ncols)
consumer.commit(message)
else:
print("break")
break
consumer.close()
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ") tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y") os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '") os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
@ -184,7 +249,8 @@ class TDTestCase:
os.system("pkill taosd") # make sure all the data are saved in disk. os.system("pkill taosd") # make sure all the data are saved in disk.
self.checkProcessPid("taosd") self.checkProcessPid("taosd")
os.system("pkill taosadapter") # make sure all the data are saved in disk.
self.checkProcessPid("taosadapter")
tdLog.printNoPrefix("==========step2:update new version ") tdLog.printNoPrefix("==========step2:update new version ")
self.buildTaosd(bPath) self.buildTaosd(bPath)
@ -193,6 +259,7 @@ class TDTestCase:
tdsql=tdCom.newTdSql() tdsql=tdCom.newTdSql()
print(tdsql) print(tdsql)
cmd = f" LD_LIBRARY_PATH=/usr/lib taos -h localhost ;" cmd = f" LD_LIBRARY_PATH=/usr/lib taos -h localhost ;"
print(os.system(cmd))
if os.system(cmd) == 0: if os.system(cmd) == 0:
raise Exception("failed to execute system command. cmd: %s" % cmd) raise Exception("failed to execute system command. cmd: %s" % cmd)