Merge pull request #28724 from taosdata/test/main/TD-32816

test:add stream and query test cases
This commit is contained in:
Shengliang Guan 2024-11-19 10:17:09 +08:00 committed by GitHub
commit e5fe064034
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 387 additions and 10 deletions

View File

@ -36,14 +36,15 @@ static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
dTrace("msg:%p, get from snode-write queue", pMsg);
int32_t code = sndProcessWriteMsg(pMgmt->pSnode, pMsg, NULL);
if (code < 0) {
dGError("snd, msg:%p failed to process write since %s", pMsg, tstrerror(code));
if (pMsg->info.handle != NULL) {
tmsgSendRsp(pMsg);
}
} else {
smSendRsp(pMsg, 0);
}
// if (code < 0) {
// dGError("snd, msg:%p failed to process write since %s", pMsg, tstrerror(code));
// if (pMsg->info.handle != NULL) {
// tmsgSendRsp(pMsg);
// }
// } else {
// smSendRsp(pMsg, 0);
// }
smSendRsp(pMsg, code);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->pCont);

View File

@ -230,6 +230,14 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/agg_group_NotReturnValue.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-32548.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stddev_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stddev_test.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stddev_test.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stddev_test.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/checkpoint_info.py -N 4
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/checkpoint_info2.py -N 4
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_multi_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False
,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False
,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreMnode.py -N 5 -M 3 -i False

View File

@ -843,9 +843,10 @@ class TDSql:
tdSql.query("select * from information_schema.ins_vnodes")
#result: dnode_id|vgroup_id|db_name|status|role_time|start_time|restored|
results = list(tdSql.queryResult)
for vnode_group_id in db_vgroups_list:
print(tdSql.queryResult)
for result in tdSql.queryResult:
for result in results:
print(f'result[2] is {result[2]}, db_name is {db_name}, result[1] is {result[1]}, vnode_group_id is {vnode_group_id}')
if result[2] == db_name and result[1] == vnode_group_id:
tdLog.debug(f"dbname: {db_name}, vgroup :{vnode_group_id}, dnode is {result[0]}")
print(useful_trans_dnodes_list)

View File

@ -0,0 +1,32 @@
from util.sql import *
from util.common import *
import taos
taos.taos_connect
class TDTestCase:
def init(self, conn, logSql, replicaVar = 1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
self.conn = conn
tdSql.init(conn.cursor(), logSql)
def initdb(self):
tdSql.execute("drop database if exists d0")
tdSql.execute("create database d0")
tdSql.execute("use d0")
tdSql.execute("create stable stb0 (ts timestamp, w_ts timestamp, opc nchar(100), quality int) tags(t0 int)")
tdSql.execute("create table t0 using stb0 tags(1)")
tdSql.execute("create table t1 using stb0 tags(2)")
def multi_insert(self):
for i in range(5):
tdSql.execute(f"insert into t1 values(1721265436000, now() + {i + 1}s, '0', 12) t1(opc, quality, ts) values ('opc2', 192, now()+ {i + 2}s) t1(ts, opc, quality) values(now() + {i + 3}s, 'opc4', 10) t1 values(1721265436000, now() + {i + 4}s, '1', 191) t1(opc, quality, ts) values('opc5', 192, now() + {i + 5}s) t1 values(now(), now() + {i + 6}s, '2', 192)")
tdSql.execute("insert into t0 values(1721265436000,now(),'0',192) t0(quality,w_ts,ts) values(192,now(),1721265326000) t0(quality,w_t\
s,ts) values(190,now()+1s,1721265326000) t0 values(1721265436000,now()+2s,'1',191) t0(quality,w_ts,ts) values(192,now()+3s,\
1721265326002) t0(ts,w_ts,opc,quality) values(1721265436003,now()+4s,'3',193) t0 values(now(), now() + 4s , '2', 192)")
def run(self):
self.initdb()
self.multi_insert()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,54 @@
import numpy as np
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
'''
Test case for TS-5150
'''
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.ts = 1537146000000
def initdabase(self):
tdSql.execute('create database if not exists db_test vgroups 2 buffer 10')
tdSql.execute('use db_test')
tdSql.execute('create stable stb(ts timestamp, delay int) tags(groupid int)')
tdSql.execute('create table t1 using stb tags(1)')
tdSql.execute('create table t2 using stb tags(2)')
tdSql.execute('create table t3 using stb tags(3)')
tdSql.execute('create table t4 using stb tags(4)')
tdSql.execute('create table t5 using stb tags(5)')
tdSql.execute('create table t6 using stb tags(6)')
def insert_data(self):
for i in range(5000):
tdSql.execute(f"insert into t1 values({self.ts + i * 1000}, {i%5})")
tdSql.execute(f"insert into t2 values({self.ts + i * 1000}, {i%5})")
tdSql.execute(f"insert into t3 values({self.ts + i * 1000}, {i%5})")
def verify_stddev(self):
for i in range(20):
tdSql.query(f'SELECT MAX(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS maxDelay,\
MIN(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS minDelay,\
AVG(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS avgDelay,\
STDDEV(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS jitter,\
COUNT(CASE WHEN delay = 0 THEN 1 ELSE NULL END) AS timeoutCount,\
COUNT(*) AS totalCount from stb where ts between {1537146000000 + i * 1000} and {1537146000000 + (i+10) * 1000}')
res = tdSql.queryResult[0][3]
assert res > 0.8
def run(self):
self.initdabase()
self.insert_data()
self.verify_stddev()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,140 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
from util.cluster import *
import threading
# should be used by -N option
class TDTestCase:
#updatecfgDict = {'checkpointInterval': 60 ,}
def init(self, conn, logSql, replicaVar=1):
print("========init========")
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def find_checkpoint_info_file(self, dirpath, checkpointid, task_id):
for root, dirs, files in os.walk(dirpath):
if f'checkpoint{checkpointid}' in dirs:
info_path = os.path.join(root, f'checkpoint{checkpointid}', 'info')
if os.path.exists(info_path):
if task_id in info_path:
return info_path
else:
continue
else:
return None
def get_dnode_info(self):
'''
get a dict from vnode to dnode
'''
self.vnode_dict = {}
sql = 'select dnode_id, vgroup_id from information_schema.ins_vnodes'
result = tdSql.getResult(sql)
for (dnode,vnode) in result:
self.vnode_dict[vnode] = dnode
def print_time_info(self):
'''
sometimes, we need to wait for a while to check the info (for example, the checkpoint info file won't be created immediately after the redistribute)
'''
times= 0
while(True):
if(self.check_info()):
tdLog.success(f'Time to finish is {times}')
return
else:
if times > 200:
tdLog.exit("time out")
times += 10
time.sleep(10)
def check_info(self):
'''
first, check if the vnode is restored
'''
while(True):
if(self.check_vnodestate()):
break
sql = 'select task_id, node_id, checkpoint_id, checkpoint_ver from information_schema.ins_stream_tasks where `level` = "source" or `level` = "agg" and node_type == "vnode"'
for task_id, vnode, checkpoint_id, checkpoint_ver in tdSql.getResult(sql):
dirpath = f"{cluster.dnodes[self.vnode_dict[vnode]-1].dataDir}/vnode/vnode{vnode}/"
info_path = self.find_checkpoint_info_file(dirpath, checkpoint_id, task_id)
if info_path is None:
return False
with open(info_path, 'r') as f:
info_id, info_ver = f.read().split()
if int(info_id) != int(checkpoint_id) or int(info_ver) != int(checkpoint_ver):
return False
return True
def restart_stream(self):
tdLog.debug("========restart stream========")
time.sleep(10)
for i in range(5):
tdSql.execute("pause stream s1")
time.sleep(2)
tdSql.execute("resume stream s1")
def initstream(self):
tdLog.debug("========case1 start========")
os.system("nohup taosBenchmark -y -B 1 -t 4 -S 500 -n 1000 -v 3 > /dev/null 2>&1 &")
time.sleep(5)
tdSql.execute("create snode on dnode 1")
tdSql.execute("use test")
tdSql.execute("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(1s)")
tdLog.debug("========create stream using snode and insert data ok========")
self.get_dnode_info()
def redistribute_vnode(self):
tdLog.debug("========redistribute vnode========")
tdSql.redistribute_db_all_vgroups()
self.get_dnode_info()
def replicate_db(self):
tdLog.debug("========replicate db========")
while True:
res = tdSql.getResult("SHOW TRANSACTIONS")
if res == []:
tdLog.debug("========== no transaction, begin to replicate db =========")
tdSql.execute("alter database test replica 3")
return
else:
time.sleep(5)
continue
def check_vnodestate(self):
sql = 'select distinct restored from information_schema.ins_vnodes'
if tdSql.getResult(sql) != [(True,)]:
tdLog.debug(f"vnode not restored, wait 5s")
time.sleep(5)
return False
else:
return True
def run(self):
print("========run========")
self.initstream()
self.restart_stream()
time.sleep(60)
self.print_time_info()
self.redistribute_vnode()
self.restart_stream()
time.sleep(60)
self.print_time_info()
def stop(self):
print("========stop========")
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,141 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
from util.cluster import *
# should be used by -N option
class TDTestCase:
updatecfgDict = {'checkpointInterval': 60 ,
}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), True)
def find_checkpoint_info_file(self, dirpath, checkpointid, task_id):
for root, dirs, files in os.walk(dirpath):
if f'checkpoint{checkpointid}' in dirs:
info_path = os.path.join(root, f'checkpoint{checkpointid}', 'info')
if os.path.exists(info_path):
if task_id in info_path:
tdLog.info(f"info file found in {info_path}")
return info_path
else:
continue
else:
tdLog.info(f"info file not found in {info_path}")
return None
else:
tdLog.info(f"no checkpoint{checkpointid} in {dirpath}")
def get_dnode_info(self):
'''
get a dict from vnode to dnode
'''
self.vnode_dict = {}
sql = 'select dnode_id, vgroup_id from information_schema.ins_vnodes where status = "leader"'
result = tdSql.getResult(sql)
for (dnode,vnode) in result:
self.vnode_dict[vnode] = dnode
def print_time_info(self):
'''
sometimes, we need to wait for a while to check the info (for example, the checkpoint info file won't be created immediately after the redistribute)
'''
times= 0
while(True):
if(self.check_info()):
tdLog.success(f'Time to finish is {times}')
return
else:
if times > 400:
tdLog.exit("time out")
times += 10
time.sleep(10)
def check_info(self):
'''
first, check if the vnode is restored
'''
while(True):
if(self.check_vnodestate()):
break
self.get_dnode_info()
sql = 'select task_id, node_id, checkpoint_id, checkpoint_ver from information_schema.ins_stream_tasks where `level` = "source" or `level` = "agg" and node_type == "vnode"'
for task_id, vnode, checkpoint_id, checkpoint_ver in tdSql.getResult(sql):
dirpath = f"{cluster.dnodes[self.vnode_dict[vnode]-1].dataDir}/vnode/vnode{vnode}/"
info_path = self.find_checkpoint_info_file(dirpath, checkpoint_id, task_id)
if info_path is None:
tdLog.info(f"info path: {dirpath} is null")
return False
with open(info_path, 'r') as f:
info_id, info_ver = f.read().split()
if int(info_id) != int(checkpoint_id) or int(info_ver) != int(checkpoint_ver):
tdLog.info(f"infoId: {info_id}, checkpointId: {checkpoint_id}, infoVer: {info_ver}, checkpointVer: {checkpoint_ver}")
return False
return True
def restart_stream(self):
tdLog.debug("========restart stream========")
for i in range(5):
tdSql.execute("pause stream s1")
time.sleep(2)
tdSql.execute("resume stream s1")
def initstream(self):
tdLog.debug("========case1 start========")
os.system("nohup taosBenchmark -y -B 1 -t 4 -S 500 -n 1000 -v 3 > /dev/null 2>&1 &")
time.sleep(5)
tdSql.execute("create snode on dnode 1")
tdSql.execute("use test")
tdSql.execute("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(1s)")
tdLog.debug("========create stream using snode and insert data ok========")
self.get_dnode_info()
def redistribute_vnode(self):
tdLog.debug("========redistribute vnode========")
tdSql.redistribute_db_all_vgroups()
self.get_dnode_info()
def replicate_db(self):
tdLog.debug("========replicate db========")
while True:
res = tdSql.getResult("SHOW TRANSACTIONS")
if res == []:
tdLog.debug("========== no transaction, begin to replicate db =========")
tdSql.execute("alter database test replica 3")
return
else:
time.sleep(5)
continue
def check_vnodestate(self):
sql = 'select distinct restored from information_schema.ins_vnodes'
if tdSql.getResult(sql) != [(True,)]:
tdLog.debug(f"vnode not restored, wait 5s")
time.sleep(5)
return False
else:
return True
def run(self):
self.initstream()
self.replicate_db()
self.print_time_info()
self.restart_stream()
time.sleep(60)
self.print_time_info()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())