From 5d8b4bf7c14fd2162b6289154ceff977e9305817 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sat, 20 Jun 2020 19:10:45 +0800 Subject: [PATCH 1/2] support queue in multi-thread version. --- .../random-test-multi-threading-3.py | 25 ++- .../random-test-multi-threading.py | 155 ++++++++++++++---- tests/pytest/stable/query_after_reset.py | 2 +- tests/pytest/util/sql.py | 6 - 4 files changed, 148 insertions(+), 40 deletions(-) diff --git a/tests/pytest/random-test/random-test-multi-threading-3.py b/tests/pytest/random-test/random-test-multi-threading-3.py index 47c4228a8f..0c8612bc44 100644 --- a/tests/pytest/random-test/random-test-multi-threading-3.py +++ b/tests/pytest/random-test/random-test-multi-threading-3.py @@ -24,6 +24,7 @@ last_tb = "" last_stb = "" written = 0 last_timestamp = 0 +colAdded = False class Test (Thread): @@ -140,6 +141,26 @@ class Test (Thread): last_tb = "" written = 0 + def alter_table_to_add_col(self): + tdLog.info("alter_table_to_add_col") + global last_stb + global colAdded + + if last_stb != "" and colAdded == False: + tdSql.execute( + "alter table %s add column col binary(20)" % + last_stb) + colAdded = True + + def alter_table_to_drop_col(self): + tdLog.info("alter_table_to_drop_col") + global last_stb + global colAdded + + if last_stb != "" and colAdded: + tdSql.execute("alter table %s drop column col" % last_stb) + colAdded = False + def restart_database(self): tdLog.info("restart_database") global last_tb @@ -235,6 +256,8 @@ class Test (Thread): 7: self.reset_database, 8: self.delete_datafiles, 9: self.drop_stable, + 10: self.alter_table_to_add_col, + 11: self.alter_table_to_drop_col, } queryOp = { @@ -256,7 +279,7 @@ class Test (Thread): while True: self.dbEvent.wait() tdLog.notice("second thread") - randDbOp = random.randint(1, 9) + randDbOp = random.randint(1, 11) dbOp.get(randDbOp, lambda: "ERROR")() self.dbEvent.clear() self.dataEvent.clear() diff --git a/tests/pytest/random-test/random-test-multi-threading.py b/tests/pytest/random-test/random-test-multi-threading.py index 65b6dcd948..ff72aa0ea6 100644 --- a/tests/pytest/random-test/random-test-multi-threading.py +++ b/tests/pytest/random-test/random-test-multi-threading.py @@ -14,6 +14,7 @@ import sys import random import threading +import queue from util.log import * from util.cases import * @@ -24,13 +25,16 @@ last_tb = "" last_stb = "" written = 0 last_timestamp = 0 +colAdded = False +killed = False class Test (threading.Thread): - def __init__(self, threadId, name): + def __init__(self, threadId, name, q): threading.Thread.__init__(self) self.threadId = threadId self.name = name + self.q = q self.threadLock = threading.Lock() @@ -38,11 +42,12 @@ class Test (threading.Thread): tdLog.info("create_table") global last_tb global written + global killed current_tb = "tb%d" % int(round(time.time() * 1000)) if (current_tb == last_tb): - return + return 0 else: tdLog.info("will create table %s" % current_tb) @@ -52,8 +57,14 @@ class Test (threading.Thread): current_tb) last_tb = current_tb written = 0 + killed = False except Exception as e: - tdLog.info(repr(e)) + tdLog.info("killed: %d error: %s" % (killed, e.args[0])) + if killed and (e.args[0] == 'network unavailable'): + tdLog.info("database killed, expect failed") + return 0 + return -1 + return 0 def insert_data(self): tdLog.info("insert_data") @@ -75,22 +86,34 @@ class Test (threading.Thread): for j in range(0, insertRows): if (last_tb == ""): tdLog.info("no table, return") - return - tdSql.execute( - 'insert into %s values (%d + %da, %d, "test")' % - (last_tb, start_time, last_timestamp, last_timestamp)) - written = written + 1 - last_timestamp = last_timestamp + 1 + return 0 + + try: + tdSql.execute( + 'insert into %s values (%d + %da, %d, "test")' % + (last_tb, start_time, last_timestamp, last_timestamp)) + written = written + 1 + last_timestamp = last_timestamp + 1 + except Exception as e: + if killed: + tdLog.info( + "database killed, expect failed %s" % + e.args[0]) + return 0 + tdLog.info(repr(e)) + return -1 + return 0 def query_data(self): tdLog.info("query_data") global last_tb - global written + global killed - if (written > 0): + if not killed and last_tb != "": tdLog.info("query data from table") tdSql.query("select * from %s" % last_tb) tdSql.checkRows(written) + return 0 def create_stable(self): tdLog.info("create_stable") @@ -101,9 +124,7 @@ class Test (threading.Thread): current_stb = "stb%d" % int(round(time.time() * 1000)) - if (current_stb == last_stb): - return - else: + if (current_stb != last_stb): tdLog.info("will create stable %s" % current_stb) tdLog.info( 'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' % @@ -131,6 +152,8 @@ class Test (threading.Thread): written = written + 1 last_timestamp = last_timestamp + 1 + return 0 + def drop_stable(self): tdLog.info("drop_stable") global last_stb @@ -139,31 +162,63 @@ class Test (threading.Thread): if (last_stb == ""): tdLog.info("no super table") - return else: - tdLog.info("will drop last super table") + tdLog.info("will drop last super table %s" % last_stb) tdSql.execute('drop table %s' % last_stb) last_stb = "" last_tb = "" written = 0 + return 0 + + def alter_table_to_add_col(self): + tdLog.info("alter_table_to_add_col") + global last_stb + global colAdded + + if last_stb != "" and colAdded == False: + tdSql.execute( + "alter table %s add column col binary(20)" % + last_stb) + colAdded = True + return 0 + + def alter_table_to_drop_col(self): + tdLog.info("alter_table_to_drop_col") + global last_stb + global colAdded + + if last_stb != "" and not colAdded: + tdSql.execute("alter table %s drop column col" % last_stb) + colAdded = False + return 0 def restart_database(self): tdLog.info("restart_database") global last_tb global written + global killed tdDnodes.stop(1) + killed = True tdDnodes.start(1) -# tdLog.sleep(5) + tdLog.sleep(10) + killed = False + return 0 def force_restart_database(self): tdLog.info("force_restart_database") global last_tb global written + global killed tdDnodes.forcestop(1) + last_tb = "" + written = 0 + killed = True tdDnodes.start(1) # tdLog.sleep(10) + killed = False + return 0 def drop_table(self): tdLog.info("drop_table") @@ -176,6 +231,7 @@ class Test (threading.Thread): tdSql.execute("drop table %s" % last_tb) last_tb = "" written = 0 + return 0 def query_data_from_stable(self): tdLog.info("query_data_from_stable") @@ -183,10 +239,10 @@ class Test (threading.Thread): if (last_stb == ""): tdLog.info("no super table") - return else: tdLog.info("will query data from super table") tdSql.execute('select * from %s' % last_stb) + return 0 def reset_query_cache(self): tdLog.info("reset_query_cache") @@ -196,38 +252,44 @@ class Test (threading.Thread): tdLog.info("reset query cache") tdSql.execute("reset query cache") # tdLog.sleep(1) + return 0 def reset_database(self): tdLog.info("reset_database") global last_tb global last_stb global written + global killed tdDnodes.forcestop(1) + killed = True tdDnodes.deploy(1) tdDnodes.start(1) tdSql.prepare() - last_tb = "" - last_stb = "" - written = 0 + killed = False + return 0 def delete_datafiles(self): tdLog.info("delete_data_files") global last_tb global last_stb global written + global killed dnodesDir = tdDnodes.getDnodesRootDir() tdDnodes.forcestop(1) dataDir = dnodesDir + '/dnode1/data/*' deleteCmd = 'rm -rf %s' % dataDir os.system(deleteCmd) - - tdDnodes.start(1) - tdSql.prepare() last_tb = "" last_stb = "" written = 0 + killed = True + + tdDnodes.start(1) + tdSql.prepare() + killed = False + return 0 def run(self): dataOp = { @@ -246,6 +308,8 @@ class Test (threading.Thread): 7: self.reset_database, 8: self.delete_datafiles, 9: self.drop_stable, + 10: self.alter_table_to_add_col, + 11: self.alter_table_to_drop_col, } if (self.threadId == 1): @@ -253,16 +317,38 @@ class Test (threading.Thread): self.threadLock.acquire() tdLog.notice("first thread") randDataOp = random.randint(1, 3) - dataOp.get(randDataOp, lambda: "ERROR")() - self.threadLock.release() + ret1 = dataOp.get(randDataOp, lambda: "ERROR")() + + if ret1 == -1: + self.q.put(-1) + tdLog.exit("first thread failed") + else: + self.q.put(1) + + if (self.q.get() != -2): + self.threadLock.release() + else: + self.q.put(-1) + tdLog.exit("second thread failed, first thread exit too") elif (self.threadId == 2): while True: - tdLog.notice("second thread") self.threadLock.acquire() - randDbOp = random.randint(1, 9) - dbOp.get(randDbOp, lambda: "ERROR")() - self.threadLock.release() + tdLog.notice("second thread") + randDbOp = random.randint(1, 11) + ret2 = dbOp.get(randDbOp, lambda: "ERROR")() + + if ret2 == -1: + self.q.put(-2) + tdLog.exit("second thread failed") + else: + self.q.put(2) + + if (self.q.get() != -1): + self.threadLock.release() + else: + self.q.put(-2) + tdLog.exit("first thread failed, second exit too") class TDTestCase: @@ -273,14 +359,19 @@ class TDTestCase: def run(self): tdSql.prepare() - test1 = Test(1, "data operation") - test2 = Test(2, "db operation") + q = queue.Queue() + test1 = Test(1, "data operation", q) + test2 = Test(2, "db operation", q) test1.start() test2.start() test1.join() test2.join() + while not q.empty(): + if (q.get() != 0): + tdLog.exit("failed to end of test") + tdLog.info("end of test") def stop(self): diff --git a/tests/pytest/stable/query_after_reset.py b/tests/pytest/stable/query_after_reset.py index 2bc171ae5d..61f6558b83 100644 --- a/tests/pytest/stable/query_after_reset.py +++ b/tests/pytest/stable/query_after_reset.py @@ -126,7 +126,7 @@ class Test: def delete_datafiles(self): tdLog.info("delete data files") dnodesDir = tdDnodes.getDnodesRootDir() - dataDir = dnodesDir + '/dnode1/*' + dataDir = dnodesDir + '/dnode1/data/*' deleteCmd = 'rm -rf %s' % dataDir os.system(deleteCmd) diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 3b86a53343..e282298b7c 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -41,16 +41,12 @@ class TDSql: def prepare(self): tdLog.info("prepare database:db") s = 'reset query cache' - print(s) self.cursor.execute(s) s = 'drop database if exists db' - print(s) self.cursor.execute(s) s = 'create database db' - print(s) self.cursor.execute(s) s = 'use db' - print(s) self.cursor.execute(s) def error(self, sql): @@ -74,7 +70,6 @@ class TDSql: def query(self, sql): self.sql = sql - print(sql) self.cursor.execute(sql) self.queryResult = self.cursor.fetchall() self.queryRows = len(self.queryResult) @@ -191,7 +186,6 @@ class TDSql: def execute(self, sql): self.sql = sql - print(sql) self.affectedRows = self.cursor.execute(sql) return self.affectedRows From 7ed67d90d49825f51c9df370fa9233d11a89d05d Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sun, 21 Jun 2020 00:35:16 +0800 Subject: [PATCH 2/2] support alter table in random-test. --- .../random-test-multi-threading.py | 4 +- tests/pytest/random-test/random-test.py | 119 +++++++++++++----- 2 files changed, 89 insertions(+), 34 deletions(-) diff --git a/tests/pytest/random-test/random-test-multi-threading.py b/tests/pytest/random-test/random-test-multi-threading.py index ff72aa0ea6..e0c30dbe62 100644 --- a/tests/pytest/random-test/random-test-multi-threading.py +++ b/tests/pytest/random-test/random-test-multi-threading.py @@ -175,7 +175,7 @@ class Test (threading.Thread): global last_stb global colAdded - if last_stb != "" and colAdded == False: + if last_stb != "" and not colAdded: tdSql.execute( "alter table %s add column col binary(20)" % last_stb) @@ -187,7 +187,7 @@ class Test (threading.Thread): global last_stb global colAdded - if last_stb != "" and not colAdded: + if last_stb != "" and colAdded: tdSql.execute("alter table %s drop column col" % last_stb) colAdded = False return 0 diff --git a/tests/pytest/random-test/random-test.py b/tests/pytest/random-test/random-test.py index 38cefb3eec..cecefe379d 100644 --- a/tests/pytest/random-test/random-test.py +++ b/tests/pytest/random-test/random-test.py @@ -25,6 +25,7 @@ class Test: self.last_tb = "" self.last_stb = "" self.written = 0 + self.colAdded = False def create_table(self): tdLog.info("create_table") @@ -39,6 +40,7 @@ class Test: current_tb) self.last_tb = current_tb self.written = 0 + self.colAdded = False def insert_data(self): tdLog.info("insert_data") @@ -50,28 +52,52 @@ class Test: insertRows = 10 tdLog.info("insert %d rows to %s" % (insertRows, self.last_tb)) for i in range(0, insertRows): - ret = tdSql.execute( - 'insert into %s values (now + %dm, %d, "%s")' % - (self.last_tb, i, i, "->" + str(i))) + if self.colAdded: + ret = tdSql.execute( + 'insert into %s values (now + %dm, %d, "%s", "%s")' % + (self.last_tb, i, i, "->" + str(i)), "col") + else: + ret = tdSql.execute( + 'insert into %s values (now + %dm, %d, "%s")' % + (self.last_tb, i, i, "->" + str(i))) + self.written = self.written + 1 tdLog.info("insert earlier data") - tdSql.execute( - 'insert into %s values (now - 5m , 10, " - 5m")' % - self.last_tb) - self.written = self.written + 1 - tdSql.execute( - 'insert into %s values (now - 6m , 10, " - 6m")' % - self.last_tb) - self.written = self.written + 1 - tdSql.execute( - 'insert into %s values (now - 7m , 10, " - 7m")' % - self.last_tb) - self.written = self.written + 1 - tdSql.execute( - 'insert into %s values (now - 8m , 10, " - 8m")' % - self.last_tb) - self.written = self.written + 1 + if self.colAdded: + tdSql.execute( + 'insert into %s values (now - 5m , 10, " - 5m", "col")' % + self.last_tb) + self.written = self.written + 1 + tdSql.execute( + 'insert into %s values (now - 6m , 10, " - 6m", "col")' % + self.last_tb) + self.written = self.written + 1 + tdSql.execute( + 'insert into %s values (now - 7m , 10, " - 7m", "col")' % + self.last_tb) + self.written = self.written + 1 + tdSql.execute( + 'insert into %s values (now - 8m , 10, " - 8m", "col")' % + self.last_tb) + self.written = self.written + 1 + else: + tdSql.execute( + 'insert into %s values (now - 5m , 10, " - 5m")' % + self.last_tb) + self.written = self.written + 1 + tdSql.execute( + 'insert into %s values (now - 6m , 10, " - 6m")' % + self.last_tb) + self.written = self.written + 1 + tdSql.execute( + 'insert into %s values (now - 7m , 10, " - 7m")' % + self.last_tb) + self.written = self.written + 1 + tdSql.execute( + 'insert into %s values (now - 8m , 10, " - 8m")' % + self.last_tb) + self.written = self.written + 1 def query_data(self): tdLog.info("query_data") @@ -88,21 +114,48 @@ class Test: return else: tdLog.info("will create stable %s" % current_stb) + + db = "db" + tdSql.execute("drop database if exists %s" % (db)) + tdSql.execute("reset query cache") + tdSql.execute("create database %s maxrows 200 maxtables 30" % (db)) + tdSql.execute("use %s" % (db)) + tdSql.execute( 'create table %s(ts timestamp, c1 int, c2 nchar(10)) tags (t1 int, t2 nchar(10))' % current_stb) self.last_stb = current_stb + self.colAdded = False - current_tb = "tb%d" % int(round(time.time() * 1000)) - sqlcmd = "create table %s using %s tags (1, 'test')" %(current_tb, self.last_stb) - tdSql.execute(sqlcmd) - self.last_tb = current_tb - self.written = 0 + for k in range(1, 300): + current_tb = "tb%d" % int(round(time.time() * 1000)) + sqlcmd = "create table %s using %s tags (1, 'test')" % ( + current_tb, self.last_stb) + tdSql.execute(sqlcmd) + self.last_tb = current_tb + self.written = 0 + for j in range(1, 100): + tdSql.execute( + "insert into %s values (now + %da, 27, 'wsnchar')" % + (self.last_tb, j)) + self.written = self.written + 1 + + def alter_table_to_add_col(self): + tdLog.info("alter_table_to_add_col") + + if self.last_stb != "" and not self.colAdded: tdSql.execute( - "insert into %s values (now, 27, 'wsnchar')" % - self.last_tb) - self.written = self.written + 1 + "alter table %s add column col binary(20)" % + self.last_stb) + self.colAdded = True + + def alter_table_to_drop_col(self): + tdLog.info("alter_table_to_drop_col") + + if self.last_stb != "" and self.colAdded: + tdSql.execute("alter table %s drop column col" % self.last_stb) + self.colAdded = False def drop_stable(self): tdLog.info("drop_stable") @@ -126,16 +179,16 @@ class Test: tdSql.execute('select * from %s' % self.last_stb) def restart_database(self): - tdLog.info("restart_databae") + tdLog.info("restart_database") tdDnodes.stop(1) tdDnodes.start(1) - tdLog.sleep(5) + tdLog.sleep(10) def force_restart_database(self): tdLog.info("force_restart_database") tdDnodes.forcestop(1) tdDnodes.start(1) - tdLog.sleep(5) + tdLog.sleep(10) tdSql.prepare() self.last_tb = "" self.last_stb = "" @@ -161,7 +214,7 @@ class Test: self.last_tb = "" self.written = 0 tdDnodes.start(1) - tdLog.sleep(5) + tdLog.sleep(10) tdSql.prepare() self.last_tb = "" self.last_stb = "" @@ -209,10 +262,12 @@ class TDTestCase: 10: test.delete_datafiles, 11: test.query_data_from_stable, 12: test.drop_stable, + 13: test.alter_table_to_add_col, + 14: test.alter_table_to_drop_col, } for x in range(1, 1000): - r = random.randint(1, 12) + r = random.randint(1, 14) tdLog.notice("iteration %d run func %d" % (x, r)) switch.get(r, lambda: "ERROR")()