Merge remote-tracking branch 'origin/develop' into feature/wal
This commit is contained in:
commit
3cfad2ed2f
|
@ -17,6 +17,7 @@ import json
|
||||||
import time
|
import time
|
||||||
import random
|
import random
|
||||||
import requests
|
import requests
|
||||||
|
import argparse
|
||||||
from requests.auth import HTTPBasicAuth
|
from requests.auth import HTTPBasicAuth
|
||||||
func_list=['avg','count','twa','sum','stddev','leastsquares','min',
|
func_list=['avg','count','twa','sum','stddev','leastsquares','min',
|
||||||
'max','first','last','top','bottom','percentile','apercentile',
|
'max','first','last','top','bottom','percentile','apercentile',
|
||||||
|
@ -32,19 +33,33 @@ condition_list=[
|
||||||
]
|
]
|
||||||
where_list = ['_c0>now-10d',' <50'," like \'%a%\'"]
|
where_list = ['_c0>now-10d',' <50'," like \'%a%\'"]
|
||||||
class ConcurrentInquiry:
|
class ConcurrentInquiry:
|
||||||
def __init__(self,n_Therads=25,r_Therads=25):
|
# def __init__(self,ts=1500000001000,host='127.0.0.1',user='root',password='taosdata',dbname='test',
|
||||||
|
# stb_prefix='st',subtb_prefix='t',n_Therads=10,r_Therads=10,probabilities=0.05,loop=5,
|
||||||
|
# stableNum = 2,subtableNum = 1000,insertRows = 100):
|
||||||
|
def __init__(self,ts,host,user,password,dbname,
|
||||||
|
stb_prefix,subtb_prefix,n_Therads,r_Therads,probabilities,loop,
|
||||||
|
stableNum ,subtableNum ,insertRows ):
|
||||||
self.n_numOfTherads = n_Therads
|
self.n_numOfTherads = n_Therads
|
||||||
self.r_numOfTherads = r_Therads
|
self.r_numOfTherads = r_Therads
|
||||||
self.ts=1500000001000
|
self.ts=ts
|
||||||
self.dbname='test'
|
self.host = host
|
||||||
|
self.user = user
|
||||||
|
self.password = password
|
||||||
|
self.dbname=dbname
|
||||||
|
self.stb_prefix = stb_prefix
|
||||||
|
self.subtb_prefix = subtb_prefix
|
||||||
self.stb_list=[]
|
self.stb_list=[]
|
||||||
self.subtb_list=[]
|
self.subtb_list=[]
|
||||||
self.stb_stru_list=[]
|
self.stb_stru_list=[]
|
||||||
self.subtb_stru_list=[]
|
self.subtb_stru_list=[]
|
||||||
self.stb_tag_list=[]
|
self.stb_tag_list=[]
|
||||||
self.subtb_tag_list=[]
|
self.subtb_tag_list=[]
|
||||||
self.probabilities = [0.05,0.95]
|
self.probabilities = [probabilities,1-probabilities]
|
||||||
self.ifjoin = [0,1]
|
self.ifjoin = [0,1]
|
||||||
|
self.loop = loop
|
||||||
|
self.stableNum = stableNum
|
||||||
|
self.subtableNum = subtableNum
|
||||||
|
self.insertRows = insertRows
|
||||||
def SetThreadsNum(self,num):
|
def SetThreadsNum(self,num):
|
||||||
self.numOfTherads=num
|
self.numOfTherads=num
|
||||||
|
|
||||||
|
@ -88,9 +103,9 @@ class ConcurrentInquiry:
|
||||||
self.subtb_tag_list.append(tag)
|
self.subtb_tag_list.append(tag)
|
||||||
|
|
||||||
def get_full(self): #获取所有的表、表结构
|
def get_full(self): #获取所有的表、表结构
|
||||||
host = "127.0.0.1"
|
host = self.host
|
||||||
user = "root"
|
user = self.user
|
||||||
password = "taosdata"
|
password = self.password
|
||||||
conn = taos.connect(
|
conn = taos.connect(
|
||||||
host,
|
host,
|
||||||
user,
|
user,
|
||||||
|
@ -117,7 +132,7 @@ class ConcurrentInquiry:
|
||||||
return 'where '+random.choice([' and ',' or ']).join(l)
|
return 'where '+random.choice([' and ',' or ']).join(l)
|
||||||
|
|
||||||
def con_interval(self,tlist,col_list,tag_list):
|
def con_interval(self,tlist,col_list,tag_list):
|
||||||
interval = 'interval(' + str(random.randint(0,100)) + random.choice(['a','s','d','w','n','y']) + ')'
|
interval = 'interval(' + str(random.randint(0,20)) + random.choice(['a','s','d','w','n','y']) + ')'
|
||||||
return interval
|
return interval
|
||||||
|
|
||||||
def con_limit(self,tlist,col_list,tag_list):
|
def con_limit(self,tlist,col_list,tag_list):
|
||||||
|
@ -133,7 +148,7 @@ class ConcurrentInquiry:
|
||||||
def con_group(self,tlist,col_list,tag_list):
|
def con_group(self,tlist,col_list,tag_list):
|
||||||
rand_tag = random.randint(0,5)
|
rand_tag = random.randint(0,5)
|
||||||
rand_col = random.randint(0,1)
|
rand_col = random.randint(0,1)
|
||||||
return 'group by '+','.join(random.sample(col_list,rand_col))+','.join(random.sample(tag_list,rand_tag))
|
return 'group by '+','.join(random.sample(col_list,rand_col) + random.sample(tag_list,rand_tag))
|
||||||
|
|
||||||
def con_order(self,tlist,col_list,tag_list):
|
def con_order(self,tlist,col_list,tag_list):
|
||||||
return 'order by '+random.choice(tlist)
|
return 'order by '+random.choice(tlist)
|
||||||
|
@ -165,8 +180,10 @@ class ConcurrentInquiry:
|
||||||
random.shuffle(func_list)
|
random.shuffle(func_list)
|
||||||
sel_col_list=[]
|
sel_col_list=[]
|
||||||
col_rand=random.randint(0,len(col_list))
|
col_rand=random.randint(0,len(col_list))
|
||||||
|
loop = 0
|
||||||
for i,j in zip(col_list[0:col_rand],func_list): #决定每个被查询col的函数
|
for i,j in zip(col_list[0:col_rand],func_list): #决定每个被查询col的函数
|
||||||
alias = ' as '+ str(i)
|
alias = ' as '+ 'taos%d ' % loop
|
||||||
|
loop += 1
|
||||||
pick_func = ''
|
pick_func = ''
|
||||||
if j == 'leastsquares':
|
if j == 'leastsquares':
|
||||||
pick_func=j+'('+i+',1,1)'
|
pick_func=j+'('+i+',1,1)'
|
||||||
|
@ -185,7 +202,7 @@ class ConcurrentInquiry:
|
||||||
for i in sel_con:
|
for i in sel_con:
|
||||||
sel_con_list.append(i(tlist,col_list,tag_list)) #获取对应的条件函数
|
sel_con_list.append(i(tlist,col_list,tag_list)) #获取对应的条件函数
|
||||||
sql+=' '.join(sel_con_list) # condition
|
sql+=' '.join(sel_con_list) # condition
|
||||||
print(sql)
|
#print(sql)
|
||||||
return sql
|
return sql
|
||||||
|
|
||||||
def gen_query_join(self): #生成join查询语句
|
def gen_query_join(self): #生成join查询语句
|
||||||
|
@ -236,8 +253,6 @@ class ConcurrentInquiry:
|
||||||
else:
|
else:
|
||||||
join_section = ''.join(random.choices(col_intersection+tag_intersection))
|
join_section = ''.join(random.choices(col_intersection+tag_intersection))
|
||||||
sql += 'where t1._c0 = t2._c0 and ' + 't1.' + join_section + '=t2.' + join_section
|
sql += 'where t1._c0 = t2._c0 and ' + 't1.' + join_section + '=t2.' + join_section
|
||||||
|
|
||||||
print(sql)
|
|
||||||
return sql
|
return sql
|
||||||
|
|
||||||
def random_pick(self):
|
def random_pick(self):
|
||||||
|
@ -248,16 +263,48 @@ class ConcurrentInquiry:
|
||||||
if x < cumulative_probability:break
|
if x < cumulative_probability:break
|
||||||
return item
|
return item
|
||||||
|
|
||||||
|
def gen_data(self):
|
||||||
|
stableNum = self.stableNum
|
||||||
|
subtableNum = self.subtableNum
|
||||||
|
insertRows = self.insertRows
|
||||||
|
t0 = self.ts
|
||||||
|
host = self.host
|
||||||
|
user = self.user
|
||||||
|
password = self.password
|
||||||
|
conn = taos.connect(
|
||||||
|
host,
|
||||||
|
user,
|
||||||
|
password,
|
||||||
|
)
|
||||||
|
cl = conn.cursor()
|
||||||
|
cl.execute("drop database if exists %s;" %self.dbname)
|
||||||
|
cl.execute("create database if not exists %s;" %self.dbname)
|
||||||
|
cl.execute("use %s" % self.dbname)
|
||||||
|
for k in range(stableNum):
|
||||||
|
sql="create table %s (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool,c8 binary(20),c9 nchar(20)) \
|
||||||
|
tags(t1 int, t2 float, t3 bigint, t4 smallint, t5 tinyint, t6 double, t7 bool,t8 binary(20),t9 nchar(20))" % (self.stb_prefix+str(k))
|
||||||
|
cl.execute(sql)
|
||||||
|
for j in range(subtableNum):
|
||||||
|
sql = "create table %s using %s tags(%d,%d,%d,%d,%d,%d,%d,'%s','%s')" % \
|
||||||
|
(self.subtb_prefix+str(k)+'_'+str(j),self.stb_prefix+str(k),j,j/2.0,j%41,j%51,j%53,j*1.0,j%2,'taos'+str(j),'涛思'+str(j))
|
||||||
|
print(sql)
|
||||||
|
cl.execute(sql)
|
||||||
|
for i in range(insertRows):
|
||||||
|
ret = cl.execute(
|
||||||
|
"insert into %s values (%d , %d,%d,%d,%d,%d,%d,%d,'%s','%s')" %
|
||||||
|
(self.subtb_prefix+str(k)+'_'+str(j),t0+i,i%100,i/2.0,i%41,i%51,i%53,i*1.0,i%2,'taos'+str(i),'涛思'+str(i)))
|
||||||
|
cl.close()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
def rest_query(self,sql): #rest 接口
|
def rest_query(self,sql): #rest 接口
|
||||||
host = "127.0.0.1"
|
host = self.host
|
||||||
user = "root"
|
user = self.user
|
||||||
password = "taosdata"
|
password = self.password
|
||||||
port =6041
|
port =6041
|
||||||
url = "http://{}:{}/rest/sql".format(host, port )
|
url = "http://{}:{}/rest/sql".format(host, port )
|
||||||
try:
|
try:
|
||||||
r = requests.post(url,
|
r = requests.post(url,
|
||||||
data = 'use test',
|
data = 'use %s' % self.dbname,
|
||||||
auth = HTTPBasicAuth('root', 'taosdata'))
|
auth = HTTPBasicAuth('root', 'taosdata'))
|
||||||
r = requests.post(url,
|
r = requests.post(url,
|
||||||
data = sql,
|
data = sql,
|
||||||
|
@ -287,20 +334,20 @@ class ConcurrentInquiry:
|
||||||
|
|
||||||
|
|
||||||
def query_thread_n(self,threadID): #使用原生python接口查询
|
def query_thread_n(self,threadID): #使用原生python接口查询
|
||||||
host = "127.0.0.1"
|
host = self.host
|
||||||
user = "root"
|
user = self.user
|
||||||
password = "taosdata"
|
password = self.password
|
||||||
conn = taos.connect(
|
conn = taos.connect(
|
||||||
host,
|
host,
|
||||||
user,
|
user,
|
||||||
password,
|
password,
|
||||||
)
|
)
|
||||||
cl = conn.cursor()
|
cl = conn.cursor()
|
||||||
cl.execute("use test;")
|
cl.execute("use %s;" % self.dbname)
|
||||||
|
|
||||||
print("Thread %d: starting" % threadID)
|
print("Thread %d: starting" % threadID)
|
||||||
|
loop = self.loop
|
||||||
while True:
|
while loop:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if self.random_pick():
|
if self.random_pick():
|
||||||
|
@ -314,33 +361,40 @@ class ConcurrentInquiry:
|
||||||
end = time.time()
|
end = time.time()
|
||||||
print("time cost :",end-start)
|
print("time cost :",end-start)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
print('-'*40)
|
||||||
print(
|
print(
|
||||||
"Failure thread%d, sql: %s,exception: %s" %
|
"Failure thread%d, sql: %s \nexception: %s" %
|
||||||
(threadID, str(sql),str(e)))
|
(threadID, str(sql),str(e)))
|
||||||
#exit(-1)
|
#exit(-1)
|
||||||
|
loop -= 1
|
||||||
|
if loop == 0: break
|
||||||
|
|
||||||
|
cl.close()
|
||||||
|
conn.close()
|
||||||
print("Thread %d: finishing" % threadID)
|
print("Thread %d: finishing" % threadID)
|
||||||
|
|
||||||
def query_thread_r(self,threadID): #使用rest接口查询
|
def query_thread_r(self,threadID): #使用rest接口查询
|
||||||
print("Thread %d: starting" % threadID)
|
print("Thread %d: starting" % threadID)
|
||||||
while True:
|
loop = self.loop
|
||||||
try:
|
while loop:
|
||||||
if self.random_pick():
|
try:
|
||||||
sql=self.gen_query_sql()
|
if self.random_pick():
|
||||||
else:
|
sql=self.gen_query_sql()
|
||||||
sql=self.gen_query_join()
|
else:
|
||||||
print("sql is ",sql)
|
sql=self.gen_query_join()
|
||||||
start = time.time()
|
print("sql is ",sql)
|
||||||
self.rest_query(sql)
|
start = time.time()
|
||||||
end = time.time()
|
self.rest_query(sql)
|
||||||
print("time cost :",end-start)
|
end = time.time()
|
||||||
except Exception as e:
|
print("time cost :",end-start)
|
||||||
print(
|
except Exception as e:
|
||||||
"Failure thread%d, sql: %s,exception: %s" %
|
print('-'*40)
|
||||||
(threadID, str(sql),str(e)))
|
print(
|
||||||
#exit(-1)
|
"Failure thread%d, sql: %s \nexception: %s" %
|
||||||
|
(threadID, str(sql),str(e)))
|
||||||
|
#exit(-1)
|
||||||
|
loop -= 1
|
||||||
|
if loop == 0: break
|
||||||
|
|
||||||
print("Thread %d: finishing" % threadID)
|
print("Thread %d: finishing" % threadID)
|
||||||
|
|
||||||
|
@ -355,10 +409,124 @@ class ConcurrentInquiry:
|
||||||
thread = threading.Thread(target=self.query_thread_r, args=(i,))
|
thread = threading.Thread(target=self.query_thread_r, args=(i,))
|
||||||
threads.append(thread)
|
threads.append(thread)
|
||||||
thread.start()
|
thread.start()
|
||||||
if len(sys.argv)>1:
|
|
||||||
q = ConcurrentInquiry(n_Therads=sys.argv[1],r_Therads=sys.argv[2])
|
parser = argparse.ArgumentParser()
|
||||||
else:
|
parser.add_argument(
|
||||||
q = ConcurrentInquiry()
|
'-H',
|
||||||
|
'--host-name',
|
||||||
|
action='store',
|
||||||
|
default='127.0.0.1',
|
||||||
|
type=str,
|
||||||
|
help='host name to be connected (default: 127.0.0.1)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-S',
|
||||||
|
'--ts',
|
||||||
|
action='store',
|
||||||
|
default=1500000000000,
|
||||||
|
type=int,
|
||||||
|
help='insert data from timestamp (default: 1500000000000)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-d',
|
||||||
|
'--db-name',
|
||||||
|
action='store',
|
||||||
|
default='test',
|
||||||
|
type=str,
|
||||||
|
help='Database name to be created (default: test)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-t',
|
||||||
|
'--number-of-native-threads',
|
||||||
|
action='store',
|
||||||
|
default=10,
|
||||||
|
type=int,
|
||||||
|
help='Number of native threads (default: 10)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-T',
|
||||||
|
'--number-of-rest-threads',
|
||||||
|
action='store',
|
||||||
|
default=10,
|
||||||
|
type=int,
|
||||||
|
help='Number of rest threads (default: 10)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-r',
|
||||||
|
'--number-of-records',
|
||||||
|
action='store',
|
||||||
|
default=100,
|
||||||
|
type=int,
|
||||||
|
help='Number of record to be created for each table (default: 100)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-c',
|
||||||
|
'--create-table',
|
||||||
|
action='store',
|
||||||
|
default='0',
|
||||||
|
type=int,
|
||||||
|
help='whether gen data (default: 0)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-p',
|
||||||
|
'--subtb-name-prefix',
|
||||||
|
action='store',
|
||||||
|
default='t',
|
||||||
|
type=str,
|
||||||
|
help='subtable-name-prefix (default: t)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-P',
|
||||||
|
'--stb-name-prefix',
|
||||||
|
action='store',
|
||||||
|
default='st',
|
||||||
|
type=str,
|
||||||
|
help='stable-name-prefix (default: st)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-b',
|
||||||
|
'--probabilities',
|
||||||
|
action='store',
|
||||||
|
default='0.05',
|
||||||
|
type=float,
|
||||||
|
help='probabilities of join (default: 0.05)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-l',
|
||||||
|
'--loop-per-thread',
|
||||||
|
action='store',
|
||||||
|
default='100',
|
||||||
|
type=int,
|
||||||
|
help='loop per thread (default: 100)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-u',
|
||||||
|
'--user',
|
||||||
|
action='store',
|
||||||
|
default='root',
|
||||||
|
type=str,
|
||||||
|
help='user name')
|
||||||
|
parser.add_argument(
|
||||||
|
'-w',
|
||||||
|
'--password',
|
||||||
|
action='store',
|
||||||
|
default='root',
|
||||||
|
type=str,
|
||||||
|
help='user name')
|
||||||
|
parser.add_argument(
|
||||||
|
'-n',
|
||||||
|
'--number-of-tables',
|
||||||
|
action='store',
|
||||||
|
default=1000,
|
||||||
|
type=int,
|
||||||
|
help='Number of subtales per stable (default: 1000)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-N',
|
||||||
|
'--number-of-stables',
|
||||||
|
action='store',
|
||||||
|
default=2,
|
||||||
|
type=int,
|
||||||
|
help='Number of stables (default: 2)')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
q = ConcurrentInquiry(
|
||||||
|
args.ts,args.host_name,args.user,args.password,args.db_name,
|
||||||
|
args.stb_name_prefix,args.subtb_name_prefix,args.number_of_native_threads,args.number_of_rest_threads,
|
||||||
|
args.probabilities,args.loop_per_thread,args.number_of_stables,args.number_of_tables ,args.number_of_records )
|
||||||
|
|
||||||
|
if args.create_table:
|
||||||
|
q.gen_data()
|
||||||
q.get_full()
|
q.get_full()
|
||||||
|
|
||||||
#q.gen_query_sql()
|
#q.gen_query_sql()
|
||||||
q.run()
|
q.run()
|
||||||
|
|
||||||
|
|
|
@ -255,17 +255,24 @@ class TDDnode:
|
||||||
tdLog.exit(cmd)
|
tdLog.exit(cmd)
|
||||||
self.running = 1
|
self.running = 1
|
||||||
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
|
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
|
||||||
time.sleep(0.1)
|
if self.valgrind == 0:
|
||||||
key = 'from offline to online'
|
time.sleep(0.1)
|
||||||
bkey = bytes(key,encoding="utf8")
|
key = 'from offline to online'
|
||||||
logFile = self.logDir + "/taosdlog.0"
|
bkey = bytes(key,encoding="utf8")
|
||||||
popen = subprocess.Popen('tail -f ' + logFile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
logFile = self.logDir + "/taosdlog.0"
|
||||||
while True:
|
popen = subprocess.Popen('tail -f ' + logFile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||||||
line = popen.stdout.readline().strip()
|
pid = popen.pid
|
||||||
if bkey in line:
|
print('Popen.pid:' + str(pid))
|
||||||
popen.kill()
|
while True:
|
||||||
break
|
line = popen.stdout.readline().strip()
|
||||||
tdLog.debug("the dnode:%d has been started." % (self.index))
|
if bkey in line:
|
||||||
|
print(line)
|
||||||
|
popen.kill()
|
||||||
|
break
|
||||||
|
tdLog.debug("the dnode:%d has been started." % (self.index))
|
||||||
|
else:
|
||||||
|
tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index))
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
# time.sleep(5)
|
# time.sleep(5)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue