refactor multi-threading random test cases.
This commit is contained in:
parent
805ed16570
commit
2c4482d7bc
|
@ -13,7 +13,7 @@
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
import random
|
import random
|
||||||
import threading
|
from threading import Thread, Event
|
||||||
|
|
||||||
from util.log import *
|
from util.log import *
|
||||||
from util.cases import *
|
from util.cases import *
|
||||||
|
@ -23,15 +23,15 @@ from util.dnodes import *
|
||||||
last_tb = ""
|
last_tb = ""
|
||||||
last_stb = ""
|
last_stb = ""
|
||||||
written = 0
|
written = 0
|
||||||
|
last_timestamp = 0
|
||||||
|
|
||||||
|
|
||||||
class Test (threading.Thread):
|
class Test (Thread):
|
||||||
def __init__(self, threadId, name):
|
def __init__(self, threadId, name, events):
|
||||||
threading.Thread.__init__(self)
|
Thread.__init__(self)
|
||||||
self.threadId = threadId
|
self.threadId = threadId
|
||||||
self.name = name
|
self.name = name
|
||||||
|
self.dataEvent, self.dbEvent, self.queryEvent = events
|
||||||
self.threadLock = threading.Lock()
|
|
||||||
|
|
||||||
def create_table(self):
|
def create_table(self):
|
||||||
tdLog.info("create_table")
|
tdLog.info("create_table")
|
||||||
|
@ -47,7 +47,7 @@ class Test (threading.Thread):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
tdSql.execute(
|
tdSql.execute(
|
||||||
'create table %s (ts timestamp, speed int)' %
|
'create table %s (ts timestamp, speed int, c2 nchar(10))' %
|
||||||
current_tb)
|
current_tb)
|
||||||
last_tb = current_tb
|
last_tb = current_tb
|
||||||
written = 0
|
written = 0
|
||||||
|
@ -58,21 +58,28 @@ class Test (threading.Thread):
|
||||||
tdLog.info("insert_data")
|
tdLog.info("insert_data")
|
||||||
global last_tb
|
global last_tb
|
||||||
global written
|
global written
|
||||||
|
global last_timestamp
|
||||||
|
|
||||||
if (last_tb == ""):
|
if (last_tb == ""):
|
||||||
tdLog.info("no table, create first")
|
tdLog.info("no table, create first")
|
||||||
self.create_table()
|
self.create_table()
|
||||||
|
|
||||||
|
start_time = 1500000000000
|
||||||
|
|
||||||
tdLog.info("will insert data to table")
|
tdLog.info("will insert data to table")
|
||||||
for i in range(0, 10):
|
for i in range(0, 10):
|
||||||
insertRows = 1000
|
insertRows = 1000
|
||||||
tdLog.info("insert %d rows to %s" % (insertRows, last_tb))
|
tdLog.info("insert %d rows to %s" % (insertRows, last_tb))
|
||||||
|
|
||||||
for j in range(0, insertRows):
|
for j in range(0, insertRows):
|
||||||
ret = tdSql.execute(
|
if (last_tb == ""):
|
||||||
'insert into %s values (now + %dm, %d)' %
|
tdLog.info("no table, return")
|
||||||
(last_tb, j, j))
|
return
|
||||||
|
tdSql.execute(
|
||||||
|
'insert into %s values (%d + %da, %d, "test")' %
|
||||||
|
(last_tb, start_time, last_timestamp, last_timestamp))
|
||||||
written = written + 1
|
written = written + 1
|
||||||
|
last_timestamp = last_timestamp + 1
|
||||||
|
|
||||||
def query_data(self):
|
def query_data(self):
|
||||||
tdLog.info("query_data")
|
tdLog.info("query_data")
|
||||||
|
@ -89,6 +96,7 @@ class Test (threading.Thread):
|
||||||
global last_tb
|
global last_tb
|
||||||
global last_stb
|
global last_stb
|
||||||
global written
|
global written
|
||||||
|
global last_timestamp
|
||||||
|
|
||||||
current_stb = "stb%d" % int(round(time.time() * 1000))
|
current_stb = "stb%d" % int(round(time.time() * 1000))
|
||||||
|
|
||||||
|
@ -106,10 +114,15 @@ class Test (threading.Thread):
|
||||||
"create table %s using %s tags (1, '表1')" %
|
"create table %s using %s tags (1, '表1')" %
|
||||||
(current_tb, last_stb))
|
(current_tb, last_stb))
|
||||||
last_tb = current_tb
|
last_tb = current_tb
|
||||||
|
written = 0
|
||||||
|
|
||||||
|
start_time = 1500000000000
|
||||||
|
|
||||||
tdSql.execute(
|
tdSql.execute(
|
||||||
"insert into %s values (now, 27, '我是nchar字符串')" %
|
"insert into %s values (%d+%da, 27, '我是nchar字符串')" %
|
||||||
last_tb)
|
(last_tb, start_time, last_timestamp))
|
||||||
written = written + 1
|
written = written + 1
|
||||||
|
last_timestamp = last_timestamp + 1
|
||||||
|
|
||||||
def drop_stable(self):
|
def drop_stable(self):
|
||||||
tdLog.info("drop_stable")
|
tdLog.info("drop_stable")
|
||||||
|
@ -205,8 +218,6 @@ class Test (threading.Thread):
|
||||||
def run(self):
|
def run(self):
|
||||||
dataOp = {
|
dataOp = {
|
||||||
1: self.insert_data,
|
1: self.insert_data,
|
||||||
2: self.query_data,
|
|
||||||
3: self.query_data_from_stable,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dbOp = {
|
dbOp = {
|
||||||
|
@ -228,26 +239,33 @@ class Test (threading.Thread):
|
||||||
|
|
||||||
if (self.threadId == 1):
|
if (self.threadId == 1):
|
||||||
while True:
|
while True:
|
||||||
self.threadLock.acquire()
|
self.dataEvent.wait()
|
||||||
tdLog.notice("first thread")
|
tdLog.notice("first thread")
|
||||||
randDataOp = random.randint(1, 3)
|
randDataOp = random.randint(1, 1)
|
||||||
dataOp.get(randDataOp, lambda: "ERROR")()
|
dataOp.get(randDataOp, lambda: "ERROR")()
|
||||||
self.threadLock.release()
|
self.dataEvent.clear()
|
||||||
|
self.queryEvent.clear()
|
||||||
|
self.dbEvent.set()
|
||||||
|
|
||||||
elif (self.threadId == 2):
|
elif (self.threadId == 2):
|
||||||
while True:
|
while True:
|
||||||
|
self.dbEvent.wait()
|
||||||
tdLog.notice("second thread")
|
tdLog.notice("second thread")
|
||||||
self.threadLock.acquire()
|
|
||||||
randDbOp = random.randint(1, 9)
|
randDbOp = random.randint(1, 9)
|
||||||
dbOp.get(randDbOp, lambda: "ERROR")()
|
dbOp.get(randDbOp, lambda: "ERROR")()
|
||||||
self.threadLock.release()
|
self.dbEvent.clear()
|
||||||
|
self.dataEvent.clear()
|
||||||
|
self.queryEvent.set()
|
||||||
|
|
||||||
elif (self.threadId == 3):
|
elif (self.threadId == 3):
|
||||||
while True:
|
while True:
|
||||||
|
self.queryEvent.wait()
|
||||||
tdLog.notice("third thread")
|
tdLog.notice("third thread")
|
||||||
self.threadLock.acquire()
|
|
||||||
randQueryOp = random.randint(1, 9)
|
randQueryOp = random.randint(1, 9)
|
||||||
queryOp.get(randQueryOp, lambda: "ERROR")()
|
queryOp.get(randQueryOp, lambda: "ERROR")()
|
||||||
self.threadLock.release()
|
self.queryEvent.clear()
|
||||||
|
self.dbEvent.clear()
|
||||||
|
self.dataEvent.set()
|
||||||
|
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
|
@ -258,13 +276,19 @@ class TDTestCase:
|
||||||
def run(self):
|
def run(self):
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
|
|
||||||
test1 = Test(1, "data operation")
|
events = [Event() for _ in range(3)]
|
||||||
test2 = Test(2, "db operation")
|
events[0].set()
|
||||||
test2 = Test(3, "query operation")
|
events[1].clear()
|
||||||
|
events[1].clear()
|
||||||
|
|
||||||
|
test1 = Test(1, "data operation", events)
|
||||||
|
test2 = Test(2, "db operation", events)
|
||||||
|
test3 = Test(3, "query operation", events)
|
||||||
|
|
||||||
test1.start()
|
test1.start()
|
||||||
test2.start()
|
test2.start()
|
||||||
test3.start()
|
test3.start()
|
||||||
|
|
||||||
test1.join()
|
test1.join()
|
||||||
test2.join()
|
test2.join()
|
||||||
test3.join()
|
test3.join()
|
||||||
|
|
|
@ -23,6 +23,7 @@ from util.dnodes import *
|
||||||
last_tb = ""
|
last_tb = ""
|
||||||
last_stb = ""
|
last_stb = ""
|
||||||
written = 0
|
written = 0
|
||||||
|
last_timestamp = 0
|
||||||
|
|
||||||
|
|
||||||
class Test (threading.Thread):
|
class Test (threading.Thread):
|
||||||
|
@ -47,7 +48,7 @@ class Test (threading.Thread):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
tdSql.execute(
|
tdSql.execute(
|
||||||
'create table %s (ts timestamp, speed int)' %
|
'create table %s (ts timestamp, speed int, c1 nchar(10))' %
|
||||||
current_tb)
|
current_tb)
|
||||||
last_tb = current_tb
|
last_tb = current_tb
|
||||||
written = 0
|
written = 0
|
||||||
|
@ -58,21 +59,28 @@ class Test (threading.Thread):
|
||||||
tdLog.info("insert_data")
|
tdLog.info("insert_data")
|
||||||
global last_tb
|
global last_tb
|
||||||
global written
|
global written
|
||||||
|
global last_timestamp
|
||||||
|
|
||||||
if (last_tb == ""):
|
if (last_tb == ""):
|
||||||
tdLog.info("no table, create first")
|
tdLog.info("no table, create first")
|
||||||
self.create_table()
|
self.create_table()
|
||||||
|
|
||||||
|
start_time = 1500000000000
|
||||||
|
|
||||||
tdLog.info("will insert data to table")
|
tdLog.info("will insert data to table")
|
||||||
for i in range(0, 10):
|
for i in range(0, 10):
|
||||||
insertRows = 1000
|
insertRows = 1000
|
||||||
tdLog.info("insert %d rows to %s" % (insertRows, last_tb))
|
tdLog.info("insert %d rows to %s" % (insertRows, last_tb))
|
||||||
|
|
||||||
for j in range(0, insertRows):
|
for j in range(0, insertRows):
|
||||||
ret = tdSql.execute(
|
if (last_tb == ""):
|
||||||
'insert into %s values (now + %dm, %d)' %
|
tdLog.info("no table, return")
|
||||||
(last_tb, j, j))
|
return
|
||||||
|
tdSql.execute(
|
||||||
|
'insert into %s values (%d + %da, %d, "test")' %
|
||||||
|
(last_tb, start_time, last_timestamp, last_timestamp))
|
||||||
written = written + 1
|
written = written + 1
|
||||||
|
last_timestamp = last_timestamp + 1
|
||||||
|
|
||||||
def query_data(self):
|
def query_data(self):
|
||||||
tdLog.info("query_data")
|
tdLog.info("query_data")
|
||||||
|
@ -89,6 +97,7 @@ class Test (threading.Thread):
|
||||||
global last_tb
|
global last_tb
|
||||||
global last_stb
|
global last_stb
|
||||||
global written
|
global written
|
||||||
|
global last_timestamp
|
||||||
|
|
||||||
current_stb = "stb%d" % int(round(time.time() * 1000))
|
current_stb = "stb%d" % int(round(time.time() * 1000))
|
||||||
|
|
||||||
|
@ -106,10 +115,15 @@ class Test (threading.Thread):
|
||||||
"create table %s using %s tags (1, '表1')" %
|
"create table %s using %s tags (1, '表1')" %
|
||||||
(current_tb, last_stb))
|
(current_tb, last_stb))
|
||||||
last_tb = current_tb
|
last_tb = current_tb
|
||||||
|
written = 0
|
||||||
|
|
||||||
|
start_time = 1500000000000
|
||||||
|
|
||||||
tdSql.execute(
|
tdSql.execute(
|
||||||
"insert into %s values (now, 27, '我是nchar字符串')" %
|
"insert into %s values (%d+%da, 27, '我是nchar字符串')" %
|
||||||
last_tb)
|
(last_tb, start_time, last_timestamp))
|
||||||
written = written + 1
|
written = written + 1
|
||||||
|
last_timestamp = last_timestamp + 1
|
||||||
|
|
||||||
def drop_stable(self):
|
def drop_stable(self):
|
||||||
tdLog.info("drop_stable")
|
tdLog.info("drop_stable")
|
||||||
|
@ -130,7 +144,7 @@ class Test (threading.Thread):
|
||||||
|
|
||||||
tdDnodes.stop(1)
|
tdDnodes.stop(1)
|
||||||
tdDnodes.start(1)
|
tdDnodes.start(1)
|
||||||
tdLog.sleep(5)
|
# tdLog.sleep(5)
|
||||||
|
|
||||||
def force_restart_database(self):
|
def force_restart_database(self):
|
||||||
tdLog.info("force_restart_database")
|
tdLog.info("force_restart_database")
|
||||||
|
@ -139,7 +153,7 @@ class Test (threading.Thread):
|
||||||
|
|
||||||
tdDnodes.forcestop(1)
|
tdDnodes.forcestop(1)
|
||||||
tdDnodes.start(1)
|
tdDnodes.start(1)
|
||||||
tdLog.sleep(10)
|
# tdLog.sleep(10)
|
||||||
|
|
||||||
def drop_table(self):
|
def drop_table(self):
|
||||||
tdLog.info("drop_table")
|
tdLog.info("drop_table")
|
||||||
|
@ -171,7 +185,7 @@ class Test (threading.Thread):
|
||||||
|
|
||||||
tdLog.info("reset query cache")
|
tdLog.info("reset query cache")
|
||||||
tdSql.execute("reset query cache")
|
tdSql.execute("reset query cache")
|
||||||
tdLog.sleep(1)
|
# tdLog.sleep(1)
|
||||||
|
|
||||||
def reset_database(self):
|
def reset_database(self):
|
||||||
tdLog.info("reset_database")
|
tdLog.info("reset_database")
|
||||||
|
@ -181,16 +195,15 @@ class Test (threading.Thread):
|
||||||
|
|
||||||
tdDnodes.forcestop(1)
|
tdDnodes.forcestop(1)
|
||||||
tdDnodes.deploy(1)
|
tdDnodes.deploy(1)
|
||||||
tdDnodes.start(1)
|
|
||||||
tdSql.prepare()
|
|
||||||
last_tb = ""
|
last_tb = ""
|
||||||
last_stb = ""
|
last_stb = ""
|
||||||
written = 0
|
written = 0
|
||||||
|
tdDnodes.start(1)
|
||||||
|
tdSql.prepare()
|
||||||
|
|
||||||
def delete_datafiles(self):
|
def delete_datafiles(self):
|
||||||
tdLog.info("delete_data_files")
|
tdLog.info("delete_data_files")
|
||||||
global last_tb
|
global last_tb
|
||||||
global last_stb
|
|
||||||
global written
|
global written
|
||||||
|
|
||||||
dnodesDir = tdDnodes.getDnodesRootDir()
|
dnodesDir = tdDnodes.getDnodesRootDir()
|
||||||
|
@ -198,12 +211,11 @@ class Test (threading.Thread):
|
||||||
deleteCmd = 'rm -rf %s' % dataDir
|
deleteCmd = 'rm -rf %s' % dataDir
|
||||||
os.system(deleteCmd)
|
os.system(deleteCmd)
|
||||||
|
|
||||||
tdDnodes.start(1)
|
|
||||||
tdLog.sleep(10)
|
|
||||||
tdSql.prepare()
|
|
||||||
last_tb = ""
|
last_tb = ""
|
||||||
last_stb = ""
|
|
||||||
written = 0
|
written = 0
|
||||||
|
tdDnodes.start(1)
|
||||||
|
# tdLog.sleep(10)
|
||||||
|
tdSql.prepare()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
dataOp = {
|
dataOp = {
|
||||||
|
|
|
@ -160,7 +160,7 @@ class TDDnode:
|
||||||
self.cfg("logDir", self.logDir)
|
self.cfg("logDir", self.logDir)
|
||||||
self.cfg("numOfLogLines", "100000000")
|
self.cfg("numOfLogLines", "100000000")
|
||||||
self.cfg("mnodeEqualVnodeNum", "0")
|
self.cfg("mnodeEqualVnodeNum", "0")
|
||||||
self.cfg("clog", "1")
|
self.cfg("walLevel", "1")
|
||||||
self.cfg("statusInterval", "1")
|
self.cfg("statusInterval", "1")
|
||||||
self.cfg("numOfTotalVnodes", "64")
|
self.cfg("numOfTotalVnodes", "64")
|
||||||
self.cfg("numOfMnodes", "3")
|
self.cfg("numOfMnodes", "3")
|
||||||
|
|
Loading…
Reference in New Issue