diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 8c33c5bb4b..1e882fc656 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -36,14 +36,15 @@ static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO dTrace("msg:%p, get from snode-write queue", pMsg); int32_t code = sndProcessWriteMsg(pMgmt->pSnode, pMsg, NULL); - if (code < 0) { - dGError("snd, msg:%p failed to process write since %s", pMsg, tstrerror(code)); - if (pMsg->info.handle != NULL) { - tmsgSendRsp(pMsg); - } - } else { - smSendRsp(pMsg, 0); - } + // if (code < 0) { + // dGError("snd, msg:%p failed to process write since %s", pMsg, tstrerror(code)); + // if (pMsg->info.handle != NULL) { + // tmsgSendRsp(pMsg); + // } + // } else { + // smSendRsp(pMsg, 0); + // } + smSendRsp(pMsg, code); dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index ed842f6463..29233bd56b 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -230,6 +230,14 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/agg_group_NotReturnValue.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-32548.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stddev_test.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stddev_test.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stddev_test.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stddev_test.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/checkpoint_info.py -N 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/checkpoint_info2.py -N 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_multi_insert.py + ,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False ,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False ,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreMnode.py -N 5 -M 3 -i False diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 1d3333264a..46b7e1f795 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -843,9 +843,10 @@ class TDSql: tdSql.query("select * from information_schema.ins_vnodes") #result: dnode_id|vgroup_id|db_name|status|role_time|start_time|restored| + results = list(tdSql.queryResult) for vnode_group_id in db_vgroups_list: - print(tdSql.queryResult) - for result in tdSql.queryResult: + for result in results: + print(f'result[2] is {result[2]}, db_name is {db_name}, result[1] is {result[1]}, vnode_group_id is {vnode_group_id}') if result[2] == db_name and result[1] == vnode_group_id: tdLog.debug(f"dbname: {db_name}, vgroup :{vnode_group_id}, dnode is {result[0]}") print(useful_trans_dnodes_list) diff --git a/tests/system-test/1-insert/test_multi_insert.py b/tests/system-test/1-insert/test_multi_insert.py new file mode 100644 index 0000000000..d1b6d28ffd --- /dev/null +++ b/tests/system-test/1-insert/test_multi_insert.py @@ -0,0 +1,32 @@ +from util.sql import * +from util.common import * +import taos +taos.taos_connect +class TDTestCase: + def init(self, conn, logSql, replicaVar = 1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + self.conn = conn + tdSql.init(conn.cursor(), logSql) + def initdb(self): + tdSql.execute("drop database if exists d0") + tdSql.execute("create database d0") + tdSql.execute("use d0") + tdSql.execute("create stable stb0 (ts timestamp, w_ts timestamp, opc nchar(100), quality int) tags(t0 int)") + tdSql.execute("create table t0 using stb0 tags(1)") + tdSql.execute("create table t1 using stb0 tags(2)") + def multi_insert(self): + for i in range(5): + tdSql.execute(f"insert into t1 values(1721265436000, now() + {i + 1}s, '0', 12) t1(opc, quality, ts) values ('opc2', 192, now()+ {i + 2}s) t1(ts, opc, quality) values(now() + {i + 3}s, 'opc4', 10) t1 values(1721265436000, now() + {i + 4}s, '1', 191) t1(opc, quality, ts) values('opc5', 192, now() + {i + 5}s) t1 values(now(), now() + {i + 6}s, '2', 192)") + tdSql.execute("insert into t0 values(1721265436000,now(),'0',192) t0(quality,w_ts,ts) values(192,now(),1721265326000) t0(quality,w_t\ +s,ts) values(190,now()+1s,1721265326000) t0 values(1721265436000,now()+2s,'1',191) t0(quality,w_ts,ts) values(192,now()+3s,\ +1721265326002) t0(ts,w_ts,opc,quality) values(1721265436003,now()+4s,'3',193) t0 values(now(), now() + 4s , '2', 192)") + def run(self): + self.initdb() + self.multi_insert() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/2-query/stddev_test.py b/tests/system-test/2-query/stddev_test.py new file mode 100644 index 0000000000..c0cb51fe57 --- /dev/null +++ b/tests/system-test/2-query/stddev_test.py @@ -0,0 +1,54 @@ +import numpy as np +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * + +''' +Test case for TS-5150 +''' +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.ts = 1537146000000 + def initdabase(self): + tdSql.execute('create database if not exists db_test vgroups 2 buffer 10') + tdSql.execute('use db_test') + tdSql.execute('create stable stb(ts timestamp, delay int) tags(groupid int)') + tdSql.execute('create table t1 using stb tags(1)') + tdSql.execute('create table t2 using stb tags(2)') + tdSql.execute('create table t3 using stb tags(3)') + tdSql.execute('create table t4 using stb tags(4)') + tdSql.execute('create table t5 using stb tags(5)') + tdSql.execute('create table t6 using stb tags(6)') + def insert_data(self): + for i in range(5000): + tdSql.execute(f"insert into t1 values({self.ts + i * 1000}, {i%5})") + tdSql.execute(f"insert into t2 values({self.ts + i * 1000}, {i%5})") + tdSql.execute(f"insert into t3 values({self.ts + i * 1000}, {i%5})") + + def verify_stddev(self): + for i in range(20): + tdSql.query(f'SELECT MAX(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS maxDelay,\ + MIN(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS minDelay,\ + AVG(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS avgDelay,\ + STDDEV(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS jitter,\ + COUNT(CASE WHEN delay = 0 THEN 1 ELSE NULL END) AS timeoutCount,\ + COUNT(*) AS totalCount from stb where ts between {1537146000000 + i * 1000} and {1537146000000 + (i+10) * 1000}') + res = tdSql.queryResult[0][3] + assert res > 0.8 + def run(self): + self.initdabase() + self.insert_data() + self.verify_stddev() + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) + diff --git a/tests/system-test/8-stream/checkpoint_info.py b/tests/system-test/8-stream/checkpoint_info.py new file mode 100644 index 0000000000..522017a702 --- /dev/null +++ b/tests/system-test/8-stream/checkpoint_info.py @@ -0,0 +1,140 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * +from util.cluster import * +import threading +# should be used by -N option +class TDTestCase: + + #updatecfgDict = {'checkpointInterval': 60 ,} + def init(self, conn, logSql, replicaVar=1): + print("========init========") + + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + def find_checkpoint_info_file(self, dirpath, checkpointid, task_id): + for root, dirs, files in os.walk(dirpath): + if f'checkpoint{checkpointid}' in dirs: + info_path = os.path.join(root, f'checkpoint{checkpointid}', 'info') + if os.path.exists(info_path): + if task_id in info_path: + return info_path + else: + continue + else: + return None + def get_dnode_info(self): + ''' + get a dict from vnode to dnode + ''' + self.vnode_dict = {} + sql = 'select dnode_id, vgroup_id from information_schema.ins_vnodes' + result = tdSql.getResult(sql) + for (dnode,vnode) in result: + self.vnode_dict[vnode] = dnode + def print_time_info(self): + ''' + sometimes, we need to wait for a while to check the info (for example, the checkpoint info file won't be created immediately after the redistribute) + ''' + times= 0 + while(True): + if(self.check_info()): + tdLog.success(f'Time to finish is {times}') + return + else: + if times > 200: + tdLog.exit("time out") + times += 10 + time.sleep(10) + def check_info(self): + ''' + first, check if the vnode is restored + ''' + while(True): + if(self.check_vnodestate()): + break + sql = 'select task_id, node_id, checkpoint_id, checkpoint_ver from information_schema.ins_stream_tasks where `level` = "source" or `level` = "agg" and node_type == "vnode"' + for task_id, vnode, checkpoint_id, checkpoint_ver in tdSql.getResult(sql): + dirpath = f"{cluster.dnodes[self.vnode_dict[vnode]-1].dataDir}/vnode/vnode{vnode}/" + info_path = self.find_checkpoint_info_file(dirpath, checkpoint_id, task_id) + if info_path is None: + return False + with open(info_path, 'r') as f: + info_id, info_ver = f.read().split() + if int(info_id) != int(checkpoint_id) or int(info_ver) != int(checkpoint_ver): + return False + return True + + def restart_stream(self): + tdLog.debug("========restart stream========") + time.sleep(10) + for i in range(5): + tdSql.execute("pause stream s1") + time.sleep(2) + tdSql.execute("resume stream s1") + def initstream(self): + tdLog.debug("========case1 start========") + os.system("nohup taosBenchmark -y -B 1 -t 4 -S 500 -n 1000 -v 3 > /dev/null 2>&1 &") + time.sleep(5) + tdSql.execute("create snode on dnode 1") + tdSql.execute("use test") + tdSql.execute("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(1s)") + tdLog.debug("========create stream using snode and insert data ok========") + self.get_dnode_info() + def redistribute_vnode(self): + tdLog.debug("========redistribute vnode========") + tdSql.redistribute_db_all_vgroups() + self.get_dnode_info() + def replicate_db(self): + tdLog.debug("========replicate db========") + while True: + res = tdSql.getResult("SHOW TRANSACTIONS") + if res == []: + tdLog.debug("========== no transaction, begin to replicate db =========") + tdSql.execute("alter database test replica 3") + return + else: + time.sleep(5) + continue + def check_vnodestate(self): + sql = 'select distinct restored from information_schema.ins_vnodes' + if tdSql.getResult(sql) != [(True,)]: + tdLog.debug(f"vnode not restored, wait 5s") + time.sleep(5) + return False + else: + return True + def run(self): + print("========run========") + self.initstream() + self.restart_stream() + time.sleep(60) + self.print_time_info() + self.redistribute_vnode() + self.restart_stream() + time.sleep(60) + self.print_time_info() + + def stop(self): + print("========stop========") + tdSql.close() + tdLog.success(f"{__file__} successfully executed") +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/8-stream/checkpoint_info2.py b/tests/system-test/8-stream/checkpoint_info2.py new file mode 100644 index 0000000000..3dc57477f7 --- /dev/null +++ b/tests/system-test/8-stream/checkpoint_info2.py @@ -0,0 +1,141 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * +from util.cluster import * + +# should be used by -N option +class TDTestCase: + updatecfgDict = {'checkpointInterval': 60 , + } + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + def find_checkpoint_info_file(self, dirpath, checkpointid, task_id): + for root, dirs, files in os.walk(dirpath): + if f'checkpoint{checkpointid}' in dirs: + info_path = os.path.join(root, f'checkpoint{checkpointid}', 'info') + if os.path.exists(info_path): + if task_id in info_path: + tdLog.info(f"info file found in {info_path}") + return info_path + else: + continue + else: + tdLog.info(f"info file not found in {info_path}") + return None + else: + tdLog.info(f"no checkpoint{checkpointid} in {dirpath}") + def get_dnode_info(self): + ''' + get a dict from vnode to dnode + ''' + self.vnode_dict = {} + sql = 'select dnode_id, vgroup_id from information_schema.ins_vnodes where status = "leader"' + result = tdSql.getResult(sql) + for (dnode,vnode) in result: + self.vnode_dict[vnode] = dnode + def print_time_info(self): + ''' + sometimes, we need to wait for a while to check the info (for example, the checkpoint info file won't be created immediately after the redistribute) + ''' + times= 0 + while(True): + if(self.check_info()): + tdLog.success(f'Time to finish is {times}') + return + else: + if times > 400: + tdLog.exit("time out") + times += 10 + time.sleep(10) + def check_info(self): + ''' + first, check if the vnode is restored + ''' + while(True): + if(self.check_vnodestate()): + break + self.get_dnode_info() + sql = 'select task_id, node_id, checkpoint_id, checkpoint_ver from information_schema.ins_stream_tasks where `level` = "source" or `level` = "agg" and node_type == "vnode"' + for task_id, vnode, checkpoint_id, checkpoint_ver in tdSql.getResult(sql): + dirpath = f"{cluster.dnodes[self.vnode_dict[vnode]-1].dataDir}/vnode/vnode{vnode}/" + info_path = self.find_checkpoint_info_file(dirpath, checkpoint_id, task_id) + if info_path is None: + tdLog.info(f"info path: {dirpath} is null") + return False + with open(info_path, 'r') as f: + info_id, info_ver = f.read().split() + if int(info_id) != int(checkpoint_id) or int(info_ver) != int(checkpoint_ver): + tdLog.info(f"infoId: {info_id}, checkpointId: {checkpoint_id}, infoVer: {info_ver}, checkpointVer: {checkpoint_ver}") + return False + return True + + def restart_stream(self): + tdLog.debug("========restart stream========") + for i in range(5): + tdSql.execute("pause stream s1") + time.sleep(2) + tdSql.execute("resume stream s1") + def initstream(self): + tdLog.debug("========case1 start========") + os.system("nohup taosBenchmark -y -B 1 -t 4 -S 500 -n 1000 -v 3 > /dev/null 2>&1 &") + time.sleep(5) + tdSql.execute("create snode on dnode 1") + tdSql.execute("use test") + tdSql.execute("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(1s)") + tdLog.debug("========create stream using snode and insert data ok========") + self.get_dnode_info() + def redistribute_vnode(self): + tdLog.debug("========redistribute vnode========") + tdSql.redistribute_db_all_vgroups() + self.get_dnode_info() + def replicate_db(self): + tdLog.debug("========replicate db========") + while True: + res = tdSql.getResult("SHOW TRANSACTIONS") + if res == []: + tdLog.debug("========== no transaction, begin to replicate db =========") + tdSql.execute("alter database test replica 3") + return + else: + time.sleep(5) + continue + def check_vnodestate(self): + sql = 'select distinct restored from information_schema.ins_vnodes' + if tdSql.getResult(sql) != [(True,)]: + tdLog.debug(f"vnode not restored, wait 5s") + time.sleep(5) + return False + else: + return True + def run(self): + self.initstream() + self.replicate_db() + self.print_time_info() + self.restart_stream() + time.sleep(60) + self.print_time_info() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())