update query script

This commit is contained in:
Ping Xiao 2022-11-04 15:39:11 +08:00
parent 4be6e0f55e
commit d2a675dbcb
1 changed files with 395 additions and 339 deletions

View File

@ -21,43 +21,52 @@ import argparse
import datetime import datetime
import string import string
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
func_list=['avg','count','twa','sum','stddev','leastsquares','min',
'max','first','last','top','bottom','percentile','apercentile', func_list = ['abs', 'acos', 'asin', 'atan', 'ceil', 'cos', 'floor', 'log', 'pow', 'round', 'sin', 'sqrt', 'tan',
'last_row','diff','spread','distinct'] 'char_length', 'concat', 'concat_ws', 'length', 'lower', 'ltrim', 'rtrim', 'substr', 'upper',
condition_list=[ 'cast', 'to_iso8601', 'to_json', 'to_unixtimestamp', 'now', 'timediff', 'timetruncate', 'timezone', 'today',
'apercentile', 'avg', 'count', 'elapsed', 'leastsquares', 'spread', 'stddev', 'sum', 'hyperloglog', 'histogram', 'percentile',
'bottom', 'first', 'interp', 'last', 'last_row', 'max', 'min', 'mode', 'sample', 'tail', 'top', 'unique',
'csum', 'derivative', 'diff', 'irate', 'mavg', 'statecount', 'stateduration', 'twa',
'database', 'client_version', 'server_version', 'server_status']
condition_list = [
"where _c0 > now -10d ", "where _c0 > now -10d ",
'interval(10s)', 'interval(10s)',
'limit 10', 'limit 10',
'group by', 'group by',
'partition by',
'order by', 'order by',
'fill(null)' 'fill(null)'
] ]
where_list = ['_c0>now-10d',' <50','like',' is null','in']
where_list = ['_c0>now-10d', ' <50', 'like', ' is null', 'in']
class ConcurrentInquiry: class ConcurrentInquiry:
# def __init__(self,ts=1500000001000,host='127.0.0.1',user='root',password='taosdata',dbname='test', # def __init__(self,ts=1500000001000,host='127.0.0.1',user='root',password='taosdata',dbname='test',
# stb_prefix='st',subtb_prefix='t',n_Therads=10,r_Therads=10,probabilities=0.05,loop=5, # stb_prefix='st',subtb_prefix='t',n_Therads=10,r_Therads=10,probabilities=0.05,loop=5,
# stableNum = 2,subtableNum = 1000,insertRows = 100): # stableNum = 2,subtableNum = 1000,insertRows = 100):
def __init__(self,ts,host,user,password,dbname, def __init__(self, ts, host, user, password, dbname,
stb_prefix,subtb_prefix,n_Therads,r_Therads,probabilities,loop, stb_prefix, subtb_prefix, n_Therads, r_Therads, probabilities, loop,
stableNum ,subtableNum ,insertRows ,mix_table, replay): stableNum, subtableNum, insertRows, mix_table, replay):
self.n_numOfTherads = n_Therads self.n_numOfTherads = n_Therads
self.r_numOfTherads = r_Therads self.r_numOfTherads = r_Therads
self.ts=ts self.ts = ts
self.host = host self.host = host
self.user = user self.user = user
self.password = password self.password = password
self.dbname=dbname self.dbname = dbname
self.stb_prefix = stb_prefix self.stb_prefix = stb_prefix
self.subtb_prefix = subtb_prefix self.subtb_prefix = subtb_prefix
self.stb_list=[] self.stb_list = []
self.subtb_list=[] self.subtb_list = []
self.stb_stru_list=[] self.stb_stru_list = []
self.subtb_stru_list=[] self.subtb_stru_list = []
self.stb_tag_list=[] self.stb_tag_list = []
self.subtb_tag_list=[] self.subtb_tag_list = []
self.probabilities = [1-probabilities,probabilities] self.probabilities = [1-probabilities, probabilities]
self.ifjoin = [1,0] self.ifjoin = [1, 0]
self.loop = loop self.loop = loop
self.stableNum = stableNum self.stableNum = stableNum
self.subtableNum = subtableNum self.subtableNum = subtableNum
@ -66,253 +75,276 @@ class ConcurrentInquiry:
self.max_ts = datetime.datetime.now() self.max_ts = datetime.datetime.now()
self.min_ts = datetime.datetime.now() - datetime.timedelta(days=5) self.min_ts = datetime.datetime.now() - datetime.timedelta(days=5)
self.replay = replay self.replay = replay
def SetThreadsNum(self,num):
self.numOfTherads=num
def ret_fcol(self,cl,sql): #返回结果的第一列 def SetThreadsNum(self, num):
self.numOfTherads = num
def ret_fcol(self, cl, sql): # 返回结果的第一列
cl.execute(sql) cl.execute(sql)
fcol_list=[] fcol_list = []
for data in cl: for data in cl:
fcol_list.append(data[0]) fcol_list.append(data[0])
return fcol_list return fcol_list
def r_stb_list(self,cl): #返回超级表列表 def r_stb_list(self, cl): # 返回超级表列表
sql='show '+self.dbname+'.stables' sql = 'show '+self.dbname+'.stables'
self.stb_list=self.ret_fcol(cl,sql) self.stb_list = self.ret_fcol(cl, sql)
def r_subtb_list(self,cl,stablename): #每个超级表返回2个子表 def r_subtb_list(self, cl, stablename): # 每个超级表返回2个子表
sql='select tbname from '+self.dbname+'.'+stablename+' limit 2;' sql = 'select tbname from '+self.dbname+'.'+stablename+' limit 2;'
self.subtb_list+=self.ret_fcol(cl,sql) self.subtb_list += self.ret_fcol(cl, sql)
def cal_struct(self,cl,tbname): #查看表结构 def cal_struct(self, cl, tbname): # 查看表结构
tb=[] tb = []
tag=[] tag = []
sql='describe '+self.dbname+'.'+tbname+';' sql = 'describe '+self.dbname+'.'+tbname+';'
cl.execute(sql) cl.execute(sql)
for data in cl: for data in cl:
if data[3]: if data[3]:
tag.append(data[0]) tag.append(data[0])
else: else:
tb.append(data[0]) tb.append(data[0])
return tb,tag return tb, tag
def r_stb_stru(self,cl): #获取所有超级表的表结构 def r_stb_stru(self, cl): # 获取所有超级表的表结构
for i in self.stb_list: for i in self.stb_list:
tb,tag=self.cal_struct(cl,i) tb, tag = self.cal_struct(cl, i)
self.stb_stru_list.append(tb) self.stb_stru_list.append(tb)
self.stb_tag_list.append(tag) self.stb_tag_list.append(tag)
def r_subtb_stru(self,cl): #返回所有子表的表结构 def r_subtb_stru(self, cl): # 返回所有子表的表结构
for i in self.subtb_list: for i in self.subtb_list:
tb,tag=self.cal_struct(cl,i) tb, tag = self.cal_struct(cl, i)
self.subtb_stru_list.append(tb) self.subtb_stru_list.append(tb)
self.subtb_tag_list.append(tag) self.subtb_tag_list.append(tag)
def get_timespan(self,cl): #获取时间跨度(仅第一个超级表) def get_timespan(self, cl): # 获取时间跨度(仅第一个超级表)
sql = 'select first(_c0),last(_c0) from ' + self.dbname + '.' + self.stb_list[0] + ';' sql = 'select first(_c0),last(_c0) from ' + \
self.dbname + '.' + self.stb_list[0] + ';'
print(sql) print(sql)
cl.execute(sql) cl.execute(sql)
for data in cl: for data in cl:
self.max_ts = data[1] self.max_ts = data[1]
self.min_ts = data[0] self.min_ts = data[0]
def get_full(self): #获取所有的表、表结构 def get_full(self): # 获取所有的表、表结构
host = self.host host = self.host
user = self.user user = self.user
password = self.password password = self.password
conn = taos.connect( conn = taos.connect(
host, host='%s' % host,
user, user='%s' % user,
password, password='%s' % password,
) )
cl = conn.cursor() cl = conn.cursor()
self.r_stb_list(cl) self.r_stb_list(cl)
for i in self.stb_list: for i in self.stb_list:
self.r_subtb_list(cl,i) self.r_subtb_list(cl, i)
self.r_stb_stru(cl) self.r_stb_stru(cl)
self.r_subtb_stru(cl) self.r_subtb_stru(cl)
self.get_timespan(cl) self.get_timespan(cl)
cl.close() cl.close()
conn.close() conn.close()
#query condition # query condition
def con_where(self,tlist,col_list,tag_list): def con_where(self, tlist, col_list, tag_list):
l=[] l = []
for i in range(random.randint(0,len(tlist))): for i in range(random.randint(0, len(tlist))):
c = random.choice(where_list) c = random.choice(where_list)
if c == '_c0>now-10d': if c == '_c0>now-10d':
rdate = self.min_ts + (self.max_ts - self.min_ts)/10 * random.randint(-11,11) rdate = self.min_ts + \
conlist = ' _c0 ' + random.choice(['<','>','>=','<=','<>']) + "'" + str(rdate) + "'" (self.max_ts - self.min_ts)/10 * random.randint(-11, 11)
conlist = ' _c0 ' + \
random.choice(['<', '>', '>=', '<=', '<>']
) + "'" + str(rdate) + "'"
if self.random_pick(): if self.random_pick():
l.append(conlist) l.append(conlist)
else: l.append(c) else:
l.append(c)
elif '<50' in c: elif '<50' in c:
conlist = ' ' + random.choice(tlist) + random.choice(['<','>','>=','<=','<>']) + str(random.randrange(-100,100)) conlist = ' ' + random.choice(tlist) + random.choice(
['<', '>', '>=', '<=', '<>']) + str(random.randrange(-100, 100))
l.append(conlist) l.append(conlist)
elif 'is null' in c: elif 'is null' in c:
conlist = ' ' + random.choice(tlist) + random.choice([' is null',' is not null']) conlist = ' ' + \
random.choice(tlist) + \
random.choice([' is null', ' is not null'])
l.append(conlist) l.append(conlist)
elif 'in' in c: elif 'in' in c:
in_list = [] in_list = []
temp = [] temp = []
for i in range(random.randint(0,100)): for i in range(random.randint(0, 100)):
temp.append(random.randint(-10000,10000)) temp.append(random.randint(-10000, 10000))
temp = (str(i) for i in temp) temp = (str(i) for i in temp)
in_list.append(temp) in_list.append(temp)
temp1 = [] temp1 = []
for i in range(random.randint(0,100)): for i in range(random.randint(0, 100)):
temp1.append("'" + ''.join(random.sample(string.ascii_letters, random.randint(0,10))) + "'") temp1.append(
"'" + ''.join(random.sample(string.ascii_letters, random.randint(0, 10))) + "'")
in_list.append(temp1) in_list.append(temp1)
in_list.append(['NULL','NULL']) in_list.append(['NULL', 'NULL'])
conlist = ' ' + random.choice(tlist) + ' in (' + ','.join(random.choice(in_list)) + ')' conlist = ' ' + \
random.choice(tlist) + ' in (' + \
','.join(random.choice(in_list)) + ')'
l.append(conlist) l.append(conlist)
else: else:
s_all = string.ascii_letters s_all = string.ascii_letters
conlist = ' ' + random.choice(tlist) + " like \'%" + random.choice(s_all) + "%\' " conlist = ' ' + \
random.choice(tlist) + " like \'%" + \
random.choice(s_all) + "%\' "
l.append(conlist) l.append(conlist)
return 'where '+random.choice([' and ',' or ']).join(l) return 'where '+random.choice([' and ', ' or ']).join(l)
def con_interval(self,tlist,col_list,tag_list): def con_interval(self, tlist, col_list, tag_list):
interval = 'interval(' + str(random.randint(0,20)) + random.choice(['a','s','d','w','n','y']) + ')' interval = 'interval(' + str(random.randint(0, 20)) + \
random.choice(['a', 's', 'd', 'w', 'n', 'y']) + ')'
return interval return interval
def con_limit(self,tlist,col_list,tag_list): def con_limit(self, tlist, col_list, tag_list):
rand1 = str(random.randint(0,1000)) rand1 = str(random.randint(0, 1000))
rand2 = str(random.randint(0,1000)) rand2 = str(random.randint(0, 1000))
return random.choice(['limit ' + rand1,'limit ' + rand1 + ' offset '+rand2, return random.choice(['limit ' + rand1, 'limit ' + rand1 + ' offset '+rand2,
' slimit ' + rand1,' slimit ' + rand1 + ' offset ' + rand2,'limit '+rand1 + ' slimit '+ rand2, ' slimit ' + rand1, ' slimit ' + rand1 + ' offset ' +
'limit '+ rand1 + ' offset' + rand2 + ' slimit '+ rand1 + ' soffset ' + rand2 ]) rand2, 'limit '+rand1 + ' slimit ' + rand2,
'limit ' + rand1 + ' offset' + rand2 + ' slimit ' + rand1 + ' soffset ' + rand2])
def con_fill(self,tlist,col_list,tag_list): def con_fill(self, tlist, col_list, tag_list):
return random.choice(['fill(null)','fill(prev)','fill(none)','fill(LINEAR)']) return random.choice(['fill(null)', 'fill(prev)', 'fill(none)', 'fill(LINEAR)'])
def con_group(self,tlist,col_list,tag_list): def con_group(self, tlist, col_list, tag_list):
rand_tag = random.randint(0,5) rand_tag = random.randint(0, 5)
rand_col = random.randint(0,1) rand_col = random.randint(0, 1)
if len(tag_list): if len(tag_list):
return 'group by '+','.join(random.sample(col_list,rand_col) + random.sample(tag_list,rand_tag)) return 'group by '+','.join(random.sample(col_list, rand_col) + random.sample(tag_list, rand_tag))
else: else:
return 'group by '+','.join(random.sample(col_list,rand_col)) return 'group by '+','.join(random.sample(col_list, rand_col))
def con_order(self,tlist,col_list,tag_list): def con_order(self, tlist, col_list, tag_list):
return 'order by '+random.choice(tlist) return 'order by '+random.choice(tlist)
def con_state_window(self,tlist,col_list,tag_list): def con_state_window(self, tlist, col_list, tag_list):
return 'state_window(' + random.choice(tlist + tag_list) + ')' return 'state_window(' + random.choice(tlist + tag_list) + ')'
def con_session_window(self,tlist,col_list,tag_list): def con_session_window(self, tlist, col_list, tag_list):
session_window = 'session_window(' + random.choice(tlist + tag_list) + ',' + str(random.randint(0,20)) + random.choice(['a','s','d','w','n','y']) + ')' session_window = 'session_window(' + random.choice(tlist + tag_list) + ',' + str(
random.randint(0, 20)) + random.choice(['a', 's', 'd', 'w', 'n', 'y']) + ')'
return session_window return session_window
def gen_subquery_sql(self): def gen_subquery_sql(self):
subsql ,col_num = self.gen_query_sql(1) subsql, col_num = self.gen_query_sql(1)
if col_num == 0: if col_num == 0:
return 0 return 0
col_list=[] col_list = []
tag_list=[] tag_list = []
for i in range(col_num): for i in range(col_num):
col_list.append("taosd%d"%i) col_list.append("taosd%d" % i)
tlist=col_list+['abc'] #增加不存在的域'abc'是否会引起新bug tlist = col_list+['abc'] # 增加不存在的域'abc'是否会引起新bug
con_rand=random.randint(0,len(condition_list)) con_rand = random.randint(0, len(condition_list))
func_rand=random.randint(0,len(func_list)) func_rand = random.randint(0, len(func_list))
col_rand=random.randint(0,len(col_list)) col_rand = random.randint(0, len(col_list))
t_rand=random.randint(0,len(tlist)) t_rand = random.randint(0, len(tlist))
sql='select ' #select sql = 'select ' # select
random.shuffle(col_list) random.shuffle(col_list)
random.shuffle(func_list) random.shuffle(func_list)
sel_col_list=[] sel_col_list = []
col_rand=random.randint(0,len(col_list)) col_rand = random.randint(0, len(col_list))
loop = 0 loop = 0
for i,j in zip(col_list[0:col_rand],func_list): #决定每个被查询col的函数 for i, j in zip(col_list[0:col_rand], func_list): # 决定每个被查询col的函数
alias = ' as '+ 'sub%d ' % loop alias = ' as ' + 'sub%d ' % loop
loop += 1 loop += 1
pick_func = '' pick_func = ''
if j == 'leastsquares': if j == 'leastsquares':
pick_func=j+'('+i+',1,1)' pick_func = j+'('+i+',1,1)'
elif j == 'top' or j == 'bottom' or j == 'percentile' or j == 'apercentile': elif j == 'top' or j == 'bottom' or j == 'percentile' or j == 'apercentile':
pick_func=j+'('+i+',1)' pick_func = j+'('+i+',1)'
else: else:
pick_func=j+'('+i+')' pick_func = j+'('+i+')'
if bool(random.getrandbits(1)) : if bool(random.getrandbits(1)):
pick_func+=alias pick_func += alias
sel_col_list.append(pick_func) sel_col_list.append(pick_func)
if col_rand == 0: if col_rand == 0:
sql = sql + '*' sql = sql + '*'
else: else:
sql=sql+','.join(sel_col_list) #select col & func sql = sql+','.join(sel_col_list) # select col & func
sql = sql + ' from ('+ subsql +') ' sql = sql + ' from (' + subsql + ') '
con_func=[self.con_where,self.con_interval,self.con_limit,self.con_group,self.con_order,self.con_fill,self.con_state_window,self.con_session_window] con_func = [self.con_where, self.con_interval, self.con_limit, self.con_group,
sel_con=random.sample(con_func,random.randint(0,len(con_func))) self.con_order, self.con_fill, self.con_state_window, self.con_session_window]
sel_con_list=[] sel_con = random.sample(con_func, random.randint(0, len(con_func)))
sel_con_list = []
for i in sel_con: for i in sel_con:
sel_con_list.append(i(tlist,col_list,tag_list)) #获取对应的条件函数 sel_con_list.append(i(tlist, col_list, tag_list)) # 获取对应的条件函数
sql+=' '.join(sel_con_list) # condition # condition
#print(sql) sql += ' '.join(sel_con_list)
# print(sql)
return sql return sql
def gen_query_sql(self,subquery=0): #生成查询语句 def gen_query_sql(self, subquery=0): # 生成查询语句
tbi=random.randint(0,len(self.subtb_list)+len(self.stb_list)) #随机决定查询哪张表 tbi = random.randint(0, len(self.subtb_list) +
tbname='' len(self.stb_list)) # 随机决定查询哪张表
col_list=[] tbname = ''
tag_list=[] col_list = []
is_stb=0 tag_list = []
if tbi>len(self.stb_list) : is_stb = 0
tbi=tbi-len(self.stb_list) if tbi > len(self.stb_list):
tbname=self.subtb_list[tbi-1] tbi = tbi-len(self.stb_list)
col_list=self.subtb_stru_list[tbi-1] tbname = self.subtb_list[tbi-1]
tag_list=self.subtb_tag_list[tbi-1] col_list = self.subtb_stru_list[tbi-1]
tag_list = self.subtb_tag_list[tbi-1]
else: else:
tbname=self.stb_list[tbi-1] tbname = self.stb_list[tbi-1]
col_list=self.stb_stru_list[tbi-1] col_list = self.stb_stru_list[tbi-1]
tag_list=self.stb_tag_list[tbi-1] tag_list = self.stb_tag_list[tbi-1]
is_stb=1 is_stb = 1
tlist=col_list+tag_list+['abc'] #增加不存在的域'abc'是否会引起新bug tlist = col_list+tag_list+['abc'] # 增加不存在的域'abc'是否会引起新bug
con_rand=random.randint(0,len(condition_list)) con_rand = random.randint(0, len(condition_list))
func_rand=random.randint(0,len(func_list)) func_rand = random.randint(0, len(func_list))
col_rand=random.randint(0,len(col_list)) col_rand = random.randint(0, len(col_list))
tag_rand=random.randint(0,len(tag_list)) tag_rand = random.randint(0, len(tag_list))
t_rand=random.randint(0,len(tlist)) t_rand = random.randint(0, len(tlist))
sql='select ' #select sql = 'select ' # select
random.shuffle(col_list) random.shuffle(col_list)
random.shuffle(func_list) random.shuffle(func_list)
sel_col_list=[] sel_col_list = []
col_rand=random.randint(0,len(col_list)) col_rand = random.randint(0, len(col_list))
loop = 0 loop = 0
for i,j in zip(col_list[0:col_rand],func_list): #决定每个被查询col的函数 for i, j in zip(col_list[0:col_rand], func_list): # 决定每个被查询col的函数
alias = ' as '+ 'taos%d ' % loop alias = ' as ' + 'taos%d ' % loop
loop += 1 loop += 1
pick_func = '' pick_func = ''
if j == 'leastsquares': if j == 'leastsquares':
pick_func=j+'('+i+',1,1)' pick_func = j+'('+i+',1,1)'
elif j == 'top' or j == 'bottom' or j == 'percentile' or j == 'apercentile': elif j == 'top' or j == 'bottom' or j == 'percentile' or j == 'apercentile':
pick_func=j+'('+i+',1)' pick_func = j+'('+i+',1)'
else: else:
pick_func=j+'('+i+')' pick_func = j+'('+i+')'
if bool(random.getrandbits(1)) | subquery : if bool(random.getrandbits(1)) | subquery:
pick_func+=alias pick_func += alias
sel_col_list.append(pick_func) sel_col_list.append(pick_func)
if col_rand == 0 & subquery : if col_rand == 0 & subquery:
sql = sql + '*' sql = sql + '*'
else: else:
sql=sql+','.join(sel_col_list) #select col & func sql = sql+','.join(sel_col_list) # select col & func
if self.mix_table == 0: if self.mix_table == 0:
sql = sql + ' from '+random.choice(self.stb_list+self.subtb_list)+' ' sql = sql + ' from ' + \
random.choice(self.stb_list+self.subtb_list)+' '
elif self.mix_table == 1: elif self.mix_table == 1:
sql = sql + ' from '+random.choice(self.subtb_list)+' ' sql = sql + ' from '+random.choice(self.subtb_list)+' '
else: else:
sql = sql + ' from '+random.choice(self.stb_list)+' ' sql = sql + ' from '+random.choice(self.stb_list)+' '
con_func=[self.con_where,self.con_interval,self.con_limit,self.con_group,self.con_order,self.con_fill,self.con_state_window,self.con_session_window] con_func = [self.con_where, self.con_interval, self.con_limit, self.con_group,
sel_con=random.sample(con_func,random.randint(0,len(con_func))) self.con_order, self.con_fill, self.con_state_window, self.con_session_window]
sel_con_list=[] sel_con = random.sample(con_func, random.randint(0, len(con_func)))
sel_con_list = []
for i in sel_con: for i in sel_con:
sel_con_list.append(i(tlist,col_list,tag_list)) #获取对应的条件函数 sel_con_list.append(i(tlist, col_list, tag_list)) # 获取对应的条件函数
sql+=' '.join(sel_con_list) # condition # condition
#print(sql) sql += ' '.join(sel_con_list)
return (sql,loop) # print(sql)
return (sql, loop)
def gen_query_join(self): #生成join查询语句 def gen_query_join(self): # 生成join查询语句
tbname = [] tbname = []
col_list = [] col_list = []
tag_list = [] tag_list = []
col_intersection = [] col_intersection = []
@ -321,86 +353,103 @@ class ConcurrentInquiry:
if self.mix_table == 0: if self.mix_table == 0:
if bool(random.getrandbits(1)): if bool(random.getrandbits(1)):
subtable = True subtable = True
tbname = random.sample(self.subtb_list,2) tbname = random.sample(self.subtb_list, 2)
for i in tbname: for i in tbname:
col_list.append(self.subtb_stru_list[self.subtb_list.index(i)]) col_list.append(
tag_list.append(self.subtb_stru_list[self.subtb_list.index(i)]) self.subtb_stru_list[self.subtb_list.index(i)])
col_intersection = list(set(col_list[0]).intersection(set(col_list[1]))) tag_list.append(
tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1]))) self.subtb_stru_list[self.subtb_list.index(i)])
col_intersection = list(
set(col_list[0]).intersection(set(col_list[1])))
tag_intersection = list(
set(tag_list[0]).intersection(set(tag_list[1])))
else: else:
tbname = random.sample(self.stb_list,2) tbname = random.sample(self.stb_list, 2)
for i in tbname: for i in tbname:
col_list.append(self.stb_stru_list[self.stb_list.index(i)]) col_list.append(self.stb_stru_list[self.stb_list.index(i)])
tag_list.append(self.stb_stru_list[self.stb_list.index(i)]) tag_list.append(self.stb_stru_list[self.stb_list.index(i)])
col_intersection = list(set(col_list[0]).intersection(set(col_list[1]))) col_intersection = list(
tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1]))) set(col_list[0]).intersection(set(col_list[1])))
tag_intersection = list(
set(tag_list[0]).intersection(set(tag_list[1])))
elif self.mix_table == 1: elif self.mix_table == 1:
subtable = True subtable = True
tbname = random.sample(self.subtb_list,2) tbname = random.sample(self.subtb_list, 2)
for i in tbname: for i in tbname:
col_list.append(self.subtb_stru_list[self.subtb_list.index(i)]) col_list.append(self.subtb_stru_list[self.subtb_list.index(i)])
tag_list.append(self.subtb_stru_list[self.subtb_list.index(i)]) tag_list.append(self.subtb_stru_list[self.subtb_list.index(i)])
col_intersection = list(set(col_list[0]).intersection(set(col_list[1]))) col_intersection = list(
tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1]))) set(col_list[0]).intersection(set(col_list[1])))
tag_intersection = list(
set(tag_list[0]).intersection(set(tag_list[1])))
else: else:
tbname = random.sample(self.stb_list,2) tbname = random.sample(self.stb_list, 2)
for i in tbname: for i in tbname:
col_list.append(self.stb_stru_list[self.stb_list.index(i)]) col_list.append(self.stb_stru_list[self.stb_list.index(i)])
tag_list.append(self.stb_stru_list[self.stb_list.index(i)]) tag_list.append(self.stb_stru_list[self.stb_list.index(i)])
col_intersection = list(set(col_list[0]).intersection(set(col_list[1]))) col_intersection = list(
tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1]))) set(col_list[0]).intersection(set(col_list[1])))
con_rand=random.randint(0,len(condition_list)) tag_intersection = list(
col_rand=random.randint(0,len(col_list)) set(tag_list[0]).intersection(set(tag_list[1])))
tag_rand=random.randint(0,len(tag_list)) con_rand = random.randint(0, len(condition_list))
sql='select ' #select col_rand = random.randint(0, len(col_list))
tag_rand = random.randint(0, len(tag_list))
sql = 'select ' # select
sel_col_tag=[] sel_col_tag = []
col_rand=random.randint(0,len(col_list)) col_rand = random.randint(0, len(col_list))
if bool(random.getrandbits(1)): if bool(random.getrandbits(1)):
sql += '*' sql += '*'
else: else:
sel_col_tag.append('t1.' + str(random.choice(col_list[0] + tag_list[0]))) sel_col_tag.append(
sel_col_tag.append('t2.' + str(random.choice(col_list[1] + tag_list[1]))) 't1.' + str(random.choice(col_list[0] + tag_list[0])))
sel_col_tag.append(
't2.' + str(random.choice(col_list[1] + tag_list[1])))
sel_col_list = [] sel_col_list = []
random.shuffle(func_list) random.shuffle(func_list)
if self.random_pick(): if self.random_pick():
loop = 0 loop = 0
for i,j in zip(sel_col_tag,func_list): #决定每个被查询col的函数 for i, j in zip(sel_col_tag, func_list): # 决定每个被查询col的函数
alias = ' as '+ 'taos%d ' % loop alias = ' as ' + 'taos%d ' % loop
loop += 1 loop += 1
pick_func = '' pick_func = ''
if j == 'leastsquares': if j == 'leastsquares':
pick_func=j+'('+i+',1,1)' pick_func = j+'('+i+',1,1)'
elif j == 'top' or j == 'bottom' or j == 'percentile' or j == 'apercentile': elif j == 'top' or j == 'bottom' or j == 'percentile' or j == 'apercentile':
pick_func=j+'('+i+',1)' pick_func = j+'('+i+',1)'
else: else:
pick_func=j+'('+i+')' pick_func = j+'('+i+')'
if bool(random.getrandbits(1)): if bool(random.getrandbits(1)):
pick_func+=alias pick_func += alias
sel_col_list.append(pick_func) sel_col_list.append(pick_func)
sql += ','.join(sel_col_list) sql += ','.join(sel_col_list)
else: else:
sql += ','.join(sel_col_tag) sql += ','.join(sel_col_tag)
sql = sql + ' from '+ str(tbname[0]) +' t1,' + str(tbname[1]) + ' t2 ' #select col & func sql = sql + ' from ' + \
str(tbname[0]) + ' t1,' + str(tbname[1]) + \
' t2 ' # select col & func
join_section = None join_section = None
temp = None temp = None
if subtable: if subtable:
temp = random.choices(col_intersection) temp = random.choices(col_intersection)
join_section = temp.pop() join_section = temp.pop()
sql += 'where t1._c0 = t2._c0 and ' + 't1.' + str(join_section) + '=t2.' + str(join_section) sql += 'where t1._c0 = t2._c0 and ' + 't1.' + \
str(join_section) + '=t2.' + str(join_section)
else: else:
temp = random.choices(col_intersection+tag_intersection) temp = random.choices(col_intersection+tag_intersection)
join_section = temp.pop() join_section = temp.pop()
sql += 'where t1._c0 = t2._c0 and ' + 't1.' + str(join_section) + '=t2.' + str(join_section) sql += 'where t1._c0 = t2._c0 and ' + 't1.' + \
str(join_section) + '=t2.' + str(join_section)
return sql return sql
def random_pick(self): def random_pick(self):
x = random.uniform(0,1) x = random.uniform(0, 1)
cumulative_probability = 0.0 cumulative_probability = 0.0
for item, item_probability in zip(self.ifjoin, self.probabilities): for item, item_probability in zip(self.ifjoin, self.probabilities):
cumulative_probability += item_probability cumulative_probability += item_probability
if x < cumulative_probability:break if x < cumulative_probability:
break
return item return item
def gen_data(self): def gen_data(self):
@ -412,52 +461,54 @@ class ConcurrentInquiry:
user = self.user user = self.user
password = self.password password = self.password
conn = taos.connect( conn = taos.connect(
host, host='%s' % host,
user, user='%s' % user,
password, password='%s' % password,
) )
cl = conn.cursor() cl = conn.cursor()
cl.execute("drop database if exists %s;" %self.dbname) cl.execute("drop database if exists %s;" % self.dbname)
cl.execute("create database if not exists %s;" %self.dbname) cl.execute("create database if not exists %s;" % self.dbname)
cl.execute("use %s" % self.dbname) cl.execute("use %s" % self.dbname)
for k in range(stableNum): for k in range(stableNum):
sql="create table %s (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool,c8 binary(20),c9 nchar(20),c11 int unsigned,c12 smallint unsigned,c13 tinyint unsigned,c14 bigint unsigned) \ sql = "create table %s (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool,c8 binary(20),c9 nchar(20),c11 int unsigned,c12 smallint unsigned,c13 tinyint unsigned,c14 bigint unsigned) \
tags(t1 int, t2 float, t3 bigint, t4 smallint, t5 tinyint, t6 double, t7 bool,t8 binary(20),t9 nchar(20), t11 int unsigned , t12 smallint unsigned , t13 tinyint unsigned , t14 bigint unsigned)" % (self.stb_prefix+str(k)) tags(t1 int, t2 float, t3 bigint, t4 smallint, t5 tinyint, t6 double, t7 bool,t8 binary(20),t9 nchar(20), t11 int unsigned , t12 smallint unsigned , t13 tinyint unsigned , t14 bigint unsigned)" % (self.stb_prefix+str(k))
cl.execute(sql) cl.execute(sql)
for j in range(subtableNum): for j in range(subtableNum):
if j % 100 == 0: if j % 100 == 0:
sql = "create table %s using %s tags(NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)" % \ sql = "create table %s using %s tags(NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)" % \
(self.subtb_prefix+str(k)+'_'+str(j),self.stb_prefix+str(k)) (self.subtb_prefix+str(k)+'_' +
str(j), self.stb_prefix+str(k))
else: else:
sql = "create table %s using %s tags(%d,%d,%d,%d,%d,%d,%d,'%s','%s',%d,%d,%d,%d)" % \ sql = "create table %s using %s tags(%d,%d,%d,%d,%d,%d,%d,'%s','%s',%d,%d,%d,%d)" % \
(self.subtb_prefix+str(k)+'_'+str(j),self.stb_prefix+str(k),j,j/2.0,j%41,j%51,j%53,j*1.0,j%2,'taos'+str(j),'涛思'+str(j), j%43, j%23 , j%17 , j%3167) (self.subtb_prefix+str(k)+'_'+str(j), self.stb_prefix+str(k), j, j/2.0, j % 41, j %
51, j % 53, j*1.0, j % 2, 'taos'+str(j), '涛思'+str(j), j % 43, j % 23, j % 17, j % 3167)
print(sql) print(sql)
cl.execute(sql) cl.execute(sql)
for i in range(insertRows): for i in range(insertRows):
if i % 100 == 0 : if i % 100 == 0:
ret = cl.execute( ret = cl.execute(
"insert into %s values (%d , NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)" % "insert into %s values (%d , NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)" %
(self.subtb_prefix+str(k)+'_'+str(j), t0+i)) (self.subtb_prefix+str(k)+'_'+str(j), t0+i))
else: else:
ret = cl.execute( ret = cl.execute(
"insert into %s values (%d , %d,%d,%d,%d,%d,%d,%d,'%s','%s',%d,%d,%d,%d)" % "insert into %s values (%d , %d,%d,%d,%d,%d,%d,%d,'%s','%s',%d,%d,%d,%d)" %
(self.subtb_prefix+str(k)+'_'+str(j), t0+i, i%100, i/2.0, i%41, i%51, i%53, i*1.0, i%2,'taos'+str(i),'涛思'+str(i), i%43, i%23 , i%17 , i%3167)) (self.subtb_prefix+str(k)+'_'+str(j), t0+i, i % 100, i/2.0, i % 41, i % 51, i % 53, i*1.0, i % 2, 'taos'+str(i), '涛思'+str(i), i % 43, i % 23, i % 17, i % 3167))
cl.close() cl.close()
conn.close() conn.close()
def rest_query(self,sql): #rest 接口 def rest_query(self, sql): # rest 接口
host = self.host host = self.host
user = self.user user = self.user
password = self.password password = self.password
port =6041 port = 6041
url = "http://{}:{}/rest/sql".format(host, port ) url = "http://{}:{}/rest/sql".format(host, port)
try: try:
r = requests.post(url, r = requests.post(url,
data = 'use %s' % self.dbname, data='use %s' % self.dbname,
auth = HTTPBasicAuth('root', 'taosdata')) auth=HTTPBasicAuth('root', 'taosdata'))
r = requests.post(url, r = requests.post(url,
data = sql, data=sql,
auth = HTTPBasicAuth('root', 'taosdata')) auth=HTTPBasicAuth('root', 'taosdata'))
except: except:
print("REST API Failure (TODO: more info here)") print("REST API Failure (TODO: more info here)")
raise raise
@ -481,165 +532,171 @@ class ConcurrentInquiry:
nRows = rj['rows'] if ('rows' in rj) else 0 nRows = rj['rows'] if ('rows' in rj) else 0
return nRows return nRows
def query_thread_n(self, threadID): # 使用原生python接口查询
def query_thread_n(self,threadID): #使用原生python接口查询
host = self.host host = self.host
user = self.user user = self.user
password = self.password password = self.password
conn = taos.connect( conn = taos.connect(
host, host='%s' % host,
user, user='%s' % user,
password, password='%s' % password,
) )
cl = conn.cursor() cl = conn.cursor()
cl.execute("use %s;" % self.dbname) cl.execute("use %s;" % self.dbname)
fo = open('bak_sql_n_%d'%threadID,'w+') fo = open('bak_sql_n_%d' % threadID, 'w+')
print("Thread %d: starting" % threadID) print("Thread %d: starting" % threadID)
loop = self.loop loop = self.loop
while loop: while loop:
try: try:
if self.random_pick():
if self.random_pick(): if self.random_pick():
if self.random_pick(): sql, temp = self.gen_query_sql()
sql,temp=self.gen_query_sql()
else:
sql = self.gen_subquery_sql()
else: else:
sql = self.gen_query_join() sql = self.gen_subquery_sql()
print("sql is ",sql) else:
fo.write(sql+'\n') sql = self.gen_query_join()
start = time.time() print("sql is ", sql)
cl.execute(sql) fo.write(sql+'\n')
cl.fetchall() start = time.time()
end = time.time() cl.execute(sql)
print("time cost :",end-start) cl.fetchall()
except Exception as e: end = time.time()
print('-'*40) print("time cost :", end-start)
print( except Exception as e:
"Failure thread%d, sql: %s \nexception: %s" % print('-'*40)
(threadID, str(sql),str(e))) print(
err_uec='Unable to establish connection' "Failure thread%d, sql: %s \nexception: %s" %
if err_uec in str(e) and loop >0: (threadID, str(sql), str(e)))
exit(-1) err_uec = 'Unable to establish connection'
loop -= 1 if err_uec in str(e) and loop > 0:
if loop == 0: break exit(-1)
loop -= 1
if loop == 0:
break
fo.close() fo.close()
cl.close() cl.close()
conn.close() conn.close()
print("Thread %d: finishing" % threadID) print("Thread %d: finishing" % threadID)
def query_thread_nr(self,threadID): #使用原生python接口进行重放 def query_thread_nr(self, threadID): # 使用原生python接口进行重放
host = self.host host = self.host
user = self.user user = self.user
password = self.password password = self.password
conn = taos.connect( conn = taos.connect(
host, host='%s' % host,
user, user='%s' % user,
password, password='%s' % password,
) )
cl = conn.cursor() cl = conn.cursor()
cl.execute("use %s;" % self.dbname) cl.execute("use %s;" % self.dbname)
replay_sql = [] replay_sql = []
with open('bak_sql_n_%d'%threadID,'r') as f: with open('bak_sql_n_%d' % threadID, 'r') as f:
replay_sql = f.readlines() replay_sql = f.readlines()
print("Replay Thread %d: starting" % threadID) print("Replay Thread %d: starting" % threadID)
for sql in replay_sql: for sql in replay_sql:
try: try:
print("sql is ",sql) print("sql is ", sql)
start = time.time() start = time.time()
cl.execute(sql) cl.execute(sql)
cl.fetchall() cl.fetchall()
end = time.time() end = time.time()
print("time cost :",end-start) print("time cost :", end-start)
except Exception as e: except Exception as e:
print('-'*40) print('-'*40)
print( print(
"Failure thread%d, sql: %s \nexception: %s" % "Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e))) (threadID, str(sql), str(e)))
err_uec='Unable to establish connection' err_uec = 'Unable to establish connection'
if err_uec in str(e) and loop >0: if err_uec in str(e) and loop > 0:
exit(-1) exit(-1)
cl.close() cl.close()
conn.close() conn.close()
print("Replay Thread %d: finishing" % threadID) print("Replay Thread %d: finishing" % threadID)
def query_thread_r(self,threadID): #使用rest接口查询 def query_thread_r(self, threadID): # 使用rest接口查询
print("Thread %d: starting" % threadID) print("Thread %d: starting" % threadID)
fo = open('bak_sql_r_%d'%threadID,'w+') fo = open('bak_sql_r_%d' % threadID, 'w+')
loop = self.loop loop = self.loop
while loop: while loop:
try: try:
if self.random_pick(): if self.random_pick():
if self.random_pick(): if self.random_pick():
sql,temp=self.gen_query_sql() sql, temp = self.gen_query_sql()
else: else:
sql = self.gen_subquery_sql() sql = self.gen_subquery_sql()
else: else:
sql = self.gen_query_join() sql = self.gen_query_join()
print("sql is ",sql) print("sql is ", sql)
fo.write(sql+'\n') fo.write(sql+'\n')
start = time.time() start = time.time()
self.rest_query(sql) self.rest_query(sql)
end = time.time() end = time.time()
print("time cost :",end-start) print("time cost :", end-start)
except Exception as e: except Exception as e:
print('-'*40) print('-'*40)
print( print(
"Failure thread%d, sql: %s \nexception: %s" % "Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e))) (threadID, str(sql), str(e)))
err_uec='Unable to establish connection' err_uec = 'Unable to establish connection'
if err_uec in str(e) and loop >0: if err_uec in str(e) and loop > 0:
exit(-1) exit(-1)
loop -= 1 loop -= 1
if loop == 0: break if loop == 0:
break
fo.close() fo.close()
print("Thread %d: finishing" % threadID) print("Thread %d: finishing" % threadID)
def query_thread_rr(self,threadID): #使用rest接口重放 def query_thread_rr(self, threadID): # 使用rest接口重放
print("Replay Thread %d: starting" % threadID) print("Replay Thread %d: starting" % threadID)
replay_sql = [] replay_sql = []
with open('bak_sql_r_%d'%threadID,'r') as f: with open('bak_sql_r_%d' % threadID, 'r') as f:
replay_sql = f.readlines() replay_sql = f.readlines()
for sql in replay_sql: for sql in replay_sql:
try: try:
print("sql is ",sql) print("sql is ", sql)
start = time.time() start = time.time()
self.rest_query(sql) self.rest_query(sql)
end = time.time() end = time.time()
print("time cost :",end-start) print("time cost :", end-start)
except Exception as e: except Exception as e:
print('-'*40) print('-'*40)
print( print(
"Failure thread%d, sql: %s \nexception: %s" % "Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e))) (threadID, str(sql), str(e)))
err_uec='Unable to establish connection' err_uec = 'Unable to establish connection'
if err_uec in str(e) and loop >0: if err_uec in str(e) and loop > 0:
exit(-1) exit(-1)
print("Replay Thread %d: finishing" % threadID) print("Replay Thread %d: finishing" % threadID)
def run(self): def run(self):
print(self.n_numOfTherads,self.r_numOfTherads) print(self.n_numOfTherads, self.r_numOfTherads)
threads = [] threads = []
if self.replay: #whether replay if self.replay: # whether replay
for i in range(self.n_numOfTherads): for i in range(self.n_numOfTherads):
thread = threading.Thread(target=self.query_thread_nr, args=(i,)) thread = threading.Thread(
target=self.query_thread_nr, args=(i,))
threads.append(thread) threads.append(thread)
thread.start() thread.start()
for i in range(self.r_numOfTherads): for i in range(self.r_numOfTherads):
thread = threading.Thread(target=self.query_thread_rr, args=(i,)) thread = threading.Thread(
target=self.query_thread_rr, args=(i,))
threads.append(thread) threads.append(thread)
thread.start() thread.start()
else: else:
for i in range(self.n_numOfTherads): for i in range(self.n_numOfTherads):
thread = threading.Thread(target=self.query_thread_n, args=(i,)) thread = threading.Thread(
target=self.query_thread_n, args=(i,))
threads.append(thread) threads.append(thread)
thread.start() thread.start()
for i in range(self.r_numOfTherads): for i in range(self.r_numOfTherads):
thread = threading.Thread(target=self.query_thread_r, args=(i,)) thread = threading.Thread(
target=self.query_thread_r, args=(i,))
threads.append(thread) threads.append(thread)
thread.start() thread.start()
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
'-H', '-H',
@ -729,9 +786,9 @@ parser.add_argument(
'-w', '-w',
'--password', '--password',
action='store', action='store',
default='root', default='taosdata',
type=str, type=str,
help='user name') help='password')
parser.add_argument( parser.add_argument(
'-n', '-n',
'--number-of-tables', '--number-of-tables',
@ -763,15 +820,14 @@ parser.add_argument(
args = parser.parse_args() args = parser.parse_args()
q = ConcurrentInquiry( q = ConcurrentInquiry(
args.ts,args.host_name,args.user,args.password,args.db_name, args.ts, args.host_name, args.user, args.password, args.db_name,
args.stb_name_prefix,args.subtb_name_prefix,args.number_of_native_threads,args.number_of_rest_threads, args.stb_name_prefix, args.subtb_name_prefix, args.number_of_native_threads, args.number_of_rest_threads,
args.probabilities,args.loop_per_thread,args.number_of_stables,args.number_of_tables ,args.number_of_records, args.probabilities, args.loop_per_thread, args.number_of_stables, args.number_of_tables, args.number_of_records,
args.mix_stable_subtable, args.replay ) args.mix_stable_subtable, args.replay)
if args.create_table: if args.create_table:
q.gen_data() q.gen_data()
q.get_full() q.get_full()
#q.gen_query_sql() # q.gen_query_sql()
q.run() q.run()