757 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			757 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
| ###################################################################
 | ||
| #           Copyright (c) 2016 by TAOS Technologies, Inc.
 | ||
| #                     All rights reserved.
 | ||
| #
 | ||
| #  This file is proprietary and confidential to TAOS Technologies.
 | ||
| #  No part of this file may be reproduced, stored, transmitted,
 | ||
| #  disclosed or used in any form or by any means other than as
 | ||
| #  expressly provided by the written permission from Jianhui Tao
 | ||
| #
 | ||
| ###################################################################
 | ||
| 
 | ||
| # -*- coding: utf-8 -*-
 | ||
| import threading
 | ||
| import taos
 | ||
| import sys
 | ||
| import json
 | ||
| import time
 | ||
| import random
 | ||
| import requests
 | ||
| import argparse
 | ||
| import datetime
 | ||
| import string
 | ||
| from requests.auth import HTTPBasicAuth
 | ||
| func_list=['avg','count','twa','sum','stddev','leastsquares','min',
 | ||
| 'max','first','last','top','bottom','percentile','apercentile',
 | ||
| 'last_row','diff','spread']
 | ||
| condition_list=[
 | ||
|     "where _c0 > now -10d ",
 | ||
|     'interval(10s)',
 | ||
|     'limit 10',
 | ||
|     'group by',
 | ||
|     'order by',
 | ||
|     'fill(null)'
 | ||
|     
 | ||
| ]
 | ||
| where_list = ['_c0>now-10d',' <50','like',' is null']
 | ||
| class ConcurrentInquiry:
 | ||
|     # def __init__(self,ts=1500000001000,host='127.0.0.1',user='root',password='taosdata',dbname='test',
 | ||
|     #             stb_prefix='st',subtb_prefix='t',n_Therads=10,r_Therads=10,probabilities=0.05,loop=5,
 | ||
|     #             stableNum = 2,subtableNum = 1000,insertRows = 100):  
 | ||
|     def __init__(self,ts,host,user,password,dbname,
 | ||
|                 stb_prefix,subtb_prefix,n_Therads,r_Therads,probabilities,loop,
 | ||
|                 stableNum ,subtableNum ,insertRows ,mix_table, replay):  
 | ||
|         self.n_numOfTherads = n_Therads
 | ||
|         self.r_numOfTherads = r_Therads
 | ||
|         self.ts=ts
 | ||
|         self.host = host
 | ||
|         self.user = user
 | ||
|         self.password = password
 | ||
|         self.dbname=dbname
 | ||
|         self.stb_prefix = stb_prefix
 | ||
|         self.subtb_prefix = subtb_prefix
 | ||
|         self.stb_list=[]
 | ||
|         self.subtb_list=[]
 | ||
|         self.stb_stru_list=[]
 | ||
|         self.subtb_stru_list=[]
 | ||
|         self.stb_tag_list=[]
 | ||
|         self.subtb_tag_list=[]
 | ||
|         self.probabilities = [1-probabilities,probabilities]
 | ||
|         self.ifjoin = [1,0]
 | ||
|         self.loop = loop
 | ||
|         self.stableNum = stableNum
 | ||
|         self.subtableNum = subtableNum
 | ||
|         self.insertRows = insertRows
 | ||
|         self.mix_table = mix_table
 | ||
|         self.max_ts = datetime.datetime.now()
 | ||
|         self.min_ts = datetime.datetime.now() - datetime.timedelta(days=5)
 | ||
|         self.replay = replay
 | ||
|     def SetThreadsNum(self,num):
 | ||
|         self.numOfTherads=num
 | ||
| 
 | ||
|     def ret_fcol(self,cl,sql):                     #返回结果的第一列
 | ||
|         cl.execute(sql)
 | ||
|         fcol_list=[]
 | ||
|         for data in cl:
 | ||
|             fcol_list.append(data[0])
 | ||
|         return fcol_list
 | ||
| 
 | ||
|     def r_stb_list(self,cl):                    #返回超级表列表
 | ||
|         sql='show '+self.dbname+'.stables'
 | ||
|         self.stb_list=self.ret_fcol(cl,sql)
 | ||
| 
 | ||
|     def r_subtb_list(self,cl,stablename):       #每个超级表返回2个子表
 | ||
|         sql='select tbname from '+self.dbname+'.'+stablename+' limit 2;'
 | ||
|         self.subtb_list+=self.ret_fcol(cl,sql)
 | ||
| 
 | ||
|     def cal_struct(self,cl,tbname):             #查看表结构
 | ||
|         tb=[]
 | ||
|         tag=[]
 | ||
|         sql='describe '+self.dbname+'.'+tbname+';'
 | ||
|         cl.execute(sql)
 | ||
|         for data in cl:
 | ||
|             if data[3]:
 | ||
|                 tag.append(data[0])
 | ||
|             else:
 | ||
|                 tb.append(data[0])
 | ||
|         return tb,tag
 | ||
| 
 | ||
|     def r_stb_stru(self,cl):                    #获取所有超级表的表结构
 | ||
|         for i in self.stb_list:
 | ||
|             tb,tag=self.cal_struct(cl,i)
 | ||
|             self.stb_stru_list.append(tb)
 | ||
|             self.stb_tag_list.append(tag)
 | ||
| 
 | ||
|     def r_subtb_stru(self,cl):                  #返回所有子表的表结构
 | ||
|         for i in self.subtb_list:
 | ||
|             tb,tag=self.cal_struct(cl,i)
 | ||
|             self.subtb_stru_list.append(tb)
 | ||
|             self.subtb_tag_list.append(tag)
 | ||
| 
 | ||
|     def get_timespan(self,cl):                  #获取时间跨度(仅第一个超级表)
 | ||
|         sql = 'select first(_c0),last(_c0) from ' + self.dbname + '.' + self.stb_list[0] + ';'
 | ||
|         print(sql)
 | ||
|         cl.execute(sql)
 | ||
|         for data in cl:
 | ||
|             self.max_ts = data[1]
 | ||
|             self.min_ts = data[0]
 | ||
| 
 | ||
|     def get_full(self):                         #获取所有的表、表结构
 | ||
|         host = self.host
 | ||
|         user = self.user
 | ||
|         password = self.password
 | ||
|         conn = taos.connect(
 | ||
|             host,
 | ||
|             user,
 | ||
|             password,
 | ||
|             )
 | ||
|         cl = conn.cursor()
 | ||
|         self.r_stb_list(cl)
 | ||
|         for i in self.stb_list:
 | ||
|             self.r_subtb_list(cl,i)
 | ||
|         self.r_stb_stru(cl)
 | ||
|         self.r_subtb_stru(cl)
 | ||
|         self.get_timespan(cl)
 | ||
|         cl.close()
 | ||
|         conn.close()  
 | ||
|         
 | ||
|     #query condition
 | ||
|     def con_where(self,tlist,col_list,tag_list):                               
 | ||
|         l=[]
 | ||
|         for i in range(random.randint(0,len(tlist))):
 | ||
|             c = random.choice(where_list)
 | ||
|             if c == '_c0>now-10d':
 | ||
|                 rdate = self.min_ts + (self.max_ts - self.min_ts)/10 * random.randint(-11,11)
 | ||
|                 conlist = ' _c0 ' + random.choice(['<','>','>=','<=','<>']) + "'" + str(rdate) + "'"
 | ||
|                 if self.random_pick():
 | ||
|                     l.append(conlist)
 | ||
|                 else: l.append(c)
 | ||
|             elif '<50' in c:
 | ||
|                 conlist = ' ' + random.choice(tlist) + random.choice(['<','>','>=','<=','<>']) + str(random.randrange(-100,100))
 | ||
|                 l.append(conlist) 
 | ||
|             elif 'is null' in c:
 | ||
|                 conlist = ' ' + random.choice(tlist) + random.choice([' is null',' is not null'])
 | ||
|                 l.append(conlist) 
 | ||
|             else:
 | ||
|                 s_all = string.ascii_letters
 | ||
|                 conlist = ' ' + random.choice(tlist) + " like \'%" + random.choice(s_all) + "%\' " 
 | ||
|                 l.append(conlist)
 | ||
|         return 'where '+random.choice([' and ',' or ']).join(l)
 | ||
| 
 | ||
|     def con_interval(self,tlist,col_list,tag_list): 
 | ||
|         interval = 'interval(' + str(random.randint(0,20)) + random.choice(['a','s','d','w','n','y'])  + ')'          
 | ||
|         return interval
 | ||
| 
 | ||
|     def con_limit(self,tlist,col_list,tag_list):
 | ||
|         rand1 = str(random.randint(0,1000))
 | ||
|         rand2 = str(random.randint(0,1000))
 | ||
|         return random.choice(['limit ' + rand1,'limit ' + rand1 + ' offset '+rand2,
 | ||
|         ' slimit ' + rand1,' slimit ' + rand1 + ' offset ' + rand2,'limit '+rand1 + ' slimit '+ rand2,
 | ||
|         'limit '+ rand1 + ' offset' + rand2 + ' slimit '+ rand1 + ' soffset ' + rand2 ])
 | ||
|     
 | ||
|     def con_fill(self,tlist,col_list,tag_list):
 | ||
|         return random.choice(['fill(null)','fill(prev)','fill(none)','fill(LINEAR)'])
 | ||
|     
 | ||
|     def con_group(self,tlist,col_list,tag_list):
 | ||
|         rand_tag = random.randint(0,5)
 | ||
|         rand_col = random.randint(0,1)
 | ||
|         if len(tag_list):
 | ||
|             return 'group by '+','.join(random.sample(col_list,rand_col) + random.sample(tag_list,rand_tag))
 | ||
|         else:
 | ||
|             return 'group by '+','.join(random.sample(col_list,rand_col))
 | ||
| 
 | ||
|     def con_order(self,tlist,col_list,tag_list):
 | ||
|         return 'order by '+random.choice(tlist)
 | ||
|     
 | ||
|     def gen_subquery_sql(self):
 | ||
|         subsql ,col_num = self.gen_query_sql(1)
 | ||
|         if col_num == 0:
 | ||
|             return 0
 | ||
|         col_list=[]
 | ||
|         tag_list=[]
 | ||
|         for i in range(col_num):
 | ||
|             col_list.append("taosd%d"%i)
 | ||
| 
 | ||
|         tlist=col_list+['abc']            #增加不存在的域'abc',是否会引起新bug
 | ||
|         con_rand=random.randint(0,len(condition_list))
 | ||
|         func_rand=random.randint(0,len(func_list))
 | ||
|         col_rand=random.randint(0,len(col_list))
 | ||
|         t_rand=random.randint(0,len(tlist))
 | ||
|         sql='select '                                           #select 
 | ||
|         random.shuffle(col_list)
 | ||
|         random.shuffle(func_list)
 | ||
|         sel_col_list=[]
 | ||
|         col_rand=random.randint(0,len(col_list))
 | ||
|         loop = 0
 | ||
|         for i,j in zip(col_list[0:col_rand],func_list):         #决定每个被查询col的函数
 | ||
|             alias = ' as '+ 'sub%d ' % loop
 | ||
|             loop += 1
 | ||
|             pick_func = ''
 | ||
|             if j == 'leastsquares':
 | ||
|                 pick_func=j+'('+i+',1,1)'
 | ||
|             elif j == 'top' or j == 'bottom' or j == 'percentile' or j == 'apercentile':
 | ||
|                 pick_func=j+'('+i+',1)'
 | ||
|             else:
 | ||
|                 pick_func=j+'('+i+')'
 | ||
|             if bool(random.getrandbits(1)) :
 | ||
|                 pick_func+=alias
 | ||
|             sel_col_list.append(pick_func)
 | ||
|         if col_rand == 0:
 | ||
|             sql = sql + '*'   
 | ||
|         else: 
 | ||
|             sql=sql+','.join(sel_col_list)         #select col & func
 | ||
|         sql = sql + ' from ('+ subsql +') ' 
 | ||
|         con_func=[self.con_where,self.con_interval,self.con_limit,self.con_group,self.con_order,self.con_fill]
 | ||
|         sel_con=random.sample(con_func,random.randint(0,len(con_func)))
 | ||
|         sel_con_list=[]
 | ||
|         for i in sel_con:
 | ||
|             sel_con_list.append(i(tlist,col_list,tag_list))                                  #获取对应的条件函数
 | ||
|         sql+=' '.join(sel_con_list)                                       # condition
 | ||
|         #print(sql)
 | ||
|         return sql
 | ||
|     
 | ||
|     def gen_query_sql(self,subquery=0):                        #生成查询语句
 | ||
|         tbi=random.randint(0,len(self.subtb_list)+len(self.stb_list))  #随机决定查询哪张表
 | ||
|         tbname=''
 | ||
|         col_list=[]
 | ||
|         tag_list=[]
 | ||
|         is_stb=0
 | ||
|         if tbi>len(self.stb_list) :
 | ||
|             tbi=tbi-len(self.stb_list)
 | ||
|             tbname=self.subtb_list[tbi-1]
 | ||
|             col_list=self.subtb_stru_list[tbi-1]
 | ||
|             tag_list=self.subtb_tag_list[tbi-1]
 | ||
|         else:
 | ||
|             tbname=self.stb_list[tbi-1]
 | ||
|             col_list=self.stb_stru_list[tbi-1]
 | ||
|             tag_list=self.stb_tag_list[tbi-1]
 | ||
|             is_stb=1
 | ||
|         tlist=col_list+tag_list+['abc']            #增加不存在的域'abc',是否会引起新bug
 | ||
|         con_rand=random.randint(0,len(condition_list))
 | ||
|         func_rand=random.randint(0,len(func_list))
 | ||
|         col_rand=random.randint(0,len(col_list))
 | ||
|         tag_rand=random.randint(0,len(tag_list))
 | ||
|         t_rand=random.randint(0,len(tlist))
 | ||
|         sql='select '                                           #select 
 | ||
|         random.shuffle(col_list)
 | ||
|         random.shuffle(func_list)
 | ||
|         sel_col_list=[]
 | ||
|         col_rand=random.randint(0,len(col_list))
 | ||
|         loop = 0
 | ||
|         for i,j in zip(col_list[0:col_rand],func_list):         #决定每个被查询col的函数
 | ||
|             alias = ' as '+ 'taos%d ' % loop
 | ||
|             loop += 1
 | ||
|             pick_func = ''
 | ||
|             if j == 'leastsquares':
 | ||
|                 pick_func=j+'('+i+',1,1)'
 | ||
|             elif j == 'top' or j == 'bottom' or j == 'percentile' or j == 'apercentile':
 | ||
|                 pick_func=j+'('+i+',1)'
 | ||
|             else:
 | ||
|                 pick_func=j+'('+i+')'
 | ||
|             if bool(random.getrandbits(1)) | subquery :
 | ||
|                 pick_func+=alias
 | ||
|             sel_col_list.append(pick_func)
 | ||
|         if col_rand == 0 & subquery :
 | ||
|             sql = sql + '*'   
 | ||
|         else: 
 | ||
|             sql=sql+','.join(sel_col_list)         #select col & func
 | ||
|         if self.mix_table == 0:
 | ||
|             sql = sql + ' from '+random.choice(self.stb_list+self.subtb_list)+' '         
 | ||
|         elif self.mix_table == 1:
 | ||
|             sql = sql + ' from '+random.choice(self.subtb_list)+' '
 | ||
|         else:
 | ||
|             sql = sql + ' from '+random.choice(self.stb_list)+' ' 
 | ||
|         con_func=[self.con_where,self.con_interval,self.con_limit,self.con_group,self.con_order,self.con_fill]
 | ||
|         sel_con=random.sample(con_func,random.randint(0,len(con_func)))
 | ||
|         sel_con_list=[]
 | ||
|         for i in sel_con:
 | ||
|             sel_con_list.append(i(tlist,col_list,tag_list))                                  #获取对应的条件函数
 | ||
|         sql+=' '.join(sel_con_list)                                       # condition
 | ||
|         #print(sql)
 | ||
|         return (sql,loop)
 | ||
| 
 | ||
|     def gen_query_join(self):                        #生成join查询语句
 | ||
|         tbname   = []
 | ||
|         col_list = []
 | ||
|         tag_list = []
 | ||
|         col_intersection = []
 | ||
|         tag_intersection = []
 | ||
|         subtable = None
 | ||
|         if self.mix_table == 0:
 | ||
|             if bool(random.getrandbits(1)):
 | ||
|                 subtable = True
 | ||
|                 tbname = random.sample(self.subtb_list,2)
 | ||
|                 for i in tbname:
 | ||
|                     col_list.append(self.subtb_stru_list[self.subtb_list.index(i)])
 | ||
|                     tag_list.append(self.subtb_stru_list[self.subtb_list.index(i)])
 | ||
|                 col_intersection = list(set(col_list[0]).intersection(set(col_list[1])))
 | ||
|                 tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1])))
 | ||
|             else:
 | ||
|                 tbname = random.sample(self.stb_list,2)
 | ||
|                 for i in tbname:
 | ||
|                     col_list.append(self.stb_stru_list[self.stb_list.index(i)])
 | ||
|                     tag_list.append(self.stb_stru_list[self.stb_list.index(i)])
 | ||
|                 col_intersection = list(set(col_list[0]).intersection(set(col_list[1])))
 | ||
|                 tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1])))
 | ||
|         elif self.mix_table == 1:
 | ||
|             subtable = True
 | ||
|             tbname = random.sample(self.subtb_list,2)
 | ||
|             for i in tbname:
 | ||
|                 col_list.append(self.subtb_stru_list[self.subtb_list.index(i)])
 | ||
|                 tag_list.append(self.subtb_stru_list[self.subtb_list.index(i)])
 | ||
|             col_intersection = list(set(col_list[0]).intersection(set(col_list[1])))
 | ||
|             tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1])))
 | ||
|         else:
 | ||
|             tbname = random.sample(self.stb_list,2)
 | ||
|             for i in tbname:
 | ||
|                 col_list.append(self.stb_stru_list[self.stb_list.index(i)])
 | ||
|                 tag_list.append(self.stb_stru_list[self.stb_list.index(i)])
 | ||
|             col_intersection = list(set(col_list[0]).intersection(set(col_list[1])))
 | ||
|             tag_intersection = list(set(tag_list[0]).intersection(set(tag_list[1])))
 | ||
|         con_rand=random.randint(0,len(condition_list))
 | ||
|         col_rand=random.randint(0,len(col_list))
 | ||
|         tag_rand=random.randint(0,len(tag_list))
 | ||
|         sql='select '                                           #select 
 | ||
|         
 | ||
|         sel_col_tag=[]
 | ||
|         col_rand=random.randint(0,len(col_list))
 | ||
|         if bool(random.getrandbits(1)):
 | ||
|             sql += '*'
 | ||
|         else:
 | ||
|             sel_col_tag.append('t1.' + str(random.choice(col_list[0] + tag_list[0])))
 | ||
|             sel_col_tag.append('t2.' + str(random.choice(col_list[1] + tag_list[1])))
 | ||
|             sel_col_list = []
 | ||
|             random.shuffle(func_list)
 | ||
|             if self.random_pick():
 | ||
|                 loop = 0
 | ||
|                 for i,j in zip(sel_col_tag,func_list):         #决定每个被查询col的函数
 | ||
|                     alias = ' as '+ 'taos%d ' % loop
 | ||
|                     loop += 1
 | ||
|                     pick_func = ''
 | ||
|                     if j == 'leastsquares':
 | ||
|                         pick_func=j+'('+i+',1,1)'
 | ||
|                     elif j == 'top' or j == 'bottom' or j == 'percentile' or j == 'apercentile':
 | ||
|                         pick_func=j+'('+i+',1)'
 | ||
|                     else:
 | ||
|                         pick_func=j+'('+i+')'
 | ||
|                     if bool(random.getrandbits(1)):
 | ||
|                         pick_func+=alias
 | ||
|                     sel_col_list.append(pick_func)
 | ||
|                 sql += ','.join(sel_col_list)
 | ||
|             else:
 | ||
|                 sql += ','.join(sel_col_tag)
 | ||
| 
 | ||
|         sql = sql + ' from '+ str(tbname[0]) +' t1,' + str(tbname[1]) + ' t2 '                        #select col & func
 | ||
|         join_section = None
 | ||
|         temp = None
 | ||
|         if subtable:
 | ||
|             temp = random.choices(col_intersection)
 | ||
|             join_section = temp.pop()
 | ||
|             sql += 'where t1._c0 = t2._c0 and ' + 't1.' + str(join_section) + '=t2.' + str(join_section)
 | ||
|         else:
 | ||
|             temp = random.choices(col_intersection+tag_intersection)
 | ||
|             join_section = temp.pop()
 | ||
|             sql += 'where t1._c0 = t2._c0 and ' + 't1.' + str(join_section) + '=t2.' + str(join_section)
 | ||
|         return sql
 | ||
| 
 | ||
|     def random_pick(self): 
 | ||
|         x = random.uniform(0,1) 
 | ||
|         cumulative_probability = 0.0 
 | ||
|         for item, item_probability in zip(self.ifjoin, self.probabilities): 
 | ||
|             cumulative_probability += item_probability 
 | ||
|             if x < cumulative_probability:break 
 | ||
|         return item
 | ||
|         
 | ||
|     def gen_data(self):
 | ||
|         stableNum = self.stableNum
 | ||
|         subtableNum = self.subtableNum
 | ||
|         insertRows = self.insertRows
 | ||
|         t0 = self.ts
 | ||
|         host = self.host
 | ||
|         user = self.user
 | ||
|         password = self.password
 | ||
|         conn = taos.connect(
 | ||
|             host,
 | ||
|             user,
 | ||
|             password,
 | ||
|             )
 | ||
|         cl = conn.cursor()
 | ||
|         cl.execute("drop database if  exists %s;" %self.dbname)
 | ||
|         cl.execute("create database if not exists %s;" %self.dbname)
 | ||
|         cl.execute("use %s" % self.dbname)
 | ||
|         for k in range(stableNum):
 | ||
|             sql="create table %s (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool,c8 binary(20),c9 nchar(20),c11 int unsigned,c12 smallint unsigned,c13 tinyint unsigned,c14 bigint unsigned) \
 | ||
|             tags(t1 int, t2 float, t3 bigint, t4 smallint, t5 tinyint, t6 double, t7 bool,t8 binary(20),t9 nchar(20), t11 int unsigned , t12 smallint unsigned , t13 tinyint unsigned , t14 bigint unsigned)" % (self.stb_prefix+str(k))
 | ||
|             cl.execute(sql)
 | ||
|             for j in range(subtableNum):
 | ||
|                 if j % 100 == 0:
 | ||
|                     sql = "create table %s using %s tags(NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)" % \
 | ||
|                             (self.subtb_prefix+str(k)+'_'+str(j),self.stb_prefix+str(k))
 | ||
|                 else:
 | ||
|                     sql = "create table %s using %s tags(%d,%d,%d,%d,%d,%d,%d,'%s','%s',%d,%d,%d,%d)" % \
 | ||
|                             (self.subtb_prefix+str(k)+'_'+str(j),self.stb_prefix+str(k),j,j/2.0,j%41,j%51,j%53,j*1.0,j%2,'taos'+str(j),'涛思'+str(j), j%43, j%23 , j%17 , j%3167)
 | ||
|                 print(sql)
 | ||
|                 cl.execute(sql)
 | ||
|                 for i in range(insertRows):
 | ||
|                     if i % 100 == 0 :
 | ||
|                         ret = cl.execute(
 | ||
|                         "insert into %s values (%d , NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)" %
 | ||
|                         (self.subtb_prefix+str(k)+'_'+str(j), t0+i))
 | ||
|                     else:
 | ||
|                         ret = cl.execute(
 | ||
|                             "insert into %s values (%d , %d,%d,%d,%d,%d,%d,%d,'%s','%s',%d,%d,%d,%d)" %
 | ||
|                             (self.subtb_prefix+str(k)+'_'+str(j), t0+i, i%100, i/2.0, i%41, i%51, i%53, i*1.0, i%2,'taos'+str(i),'涛思'+str(i), i%43, i%23 , i%17 , i%3167))
 | ||
|         cl.close()
 | ||
|         conn.close()
 | ||
|         
 | ||
|     def rest_query(self,sql):                                       #rest 接口
 | ||
|         host = self.host
 | ||
|         user = self.user
 | ||
|         password = self.password
 | ||
|         port =6041
 | ||
|         url = "http://{}:{}/rest/sql".format(host, port )
 | ||
|         try:
 | ||
|             r = requests.post(url, 
 | ||
|                 data = 'use %s' % self.dbname,
 | ||
|                 auth = HTTPBasicAuth('root', 'taosdata'))  
 | ||
|             r = requests.post(url, 
 | ||
|                 data = sql,
 | ||
|                 auth = HTTPBasicAuth('root', 'taosdata'))         
 | ||
|         except:
 | ||
|             print("REST API Failure (TODO: more info here)")
 | ||
|             raise
 | ||
|         rj = r.json()
 | ||
|         if ('status' not in rj):
 | ||
|             raise RuntimeError("No status in REST response")
 | ||
| 
 | ||
|         if rj['status'] == 'error':  # clearly reported error
 | ||
|             if ('code' not in rj):  # error without code
 | ||
|                 raise RuntimeError("REST error return without code")
 | ||
|             errno = rj['code']  # May need to massage this in the future
 | ||
|             # print("Raising programming error with REST return: {}".format(rj))
 | ||
|             raise taos.error.ProgrammingError(
 | ||
|                 rj['desc'], errno)  # todo: check existance of 'desc'
 | ||
| 
 | ||
|         if rj['status'] != 'succ':  # better be this
 | ||
|             raise RuntimeError(
 | ||
|                 "Unexpected REST return status: {}".format(
 | ||
|                     rj['status']))
 | ||
| 
 | ||
|         nRows = rj['rows'] if ('rows' in rj) else 0
 | ||
|         return nRows
 | ||
| 
 | ||
|     
 | ||
|     def query_thread_n(self,threadID):                      #使用原生python接口查询
 | ||
|         host = self.host
 | ||
|         user = self.user
 | ||
|         password = self.password
 | ||
|         conn = taos.connect(
 | ||
|             host,
 | ||
|             user,
 | ||
|             password,
 | ||
|             )
 | ||
|         cl = conn.cursor()
 | ||
|         cl.execute("use %s;" % self.dbname)
 | ||
|         fo = open('bak_sql_n_%d'%threadID,'w+')
 | ||
|         print("Thread %d: starting" % threadID)
 | ||
|         loop = self.loop
 | ||
|         while loop:
 | ||
|             
 | ||
|                 try:
 | ||
|                     if self.random_pick():
 | ||
|                         if self.random_pick():
 | ||
|                             sql,temp=self.gen_query_sql()
 | ||
|                         else:
 | ||
|                             sql = self.gen_subquery_sql()
 | ||
|                     else:
 | ||
|                         sql = self.gen_query_join()
 | ||
|                     print("sql is ",sql)
 | ||
|                     fo.write(sql+'\n')
 | ||
|                     start = time.time()
 | ||
|                     cl.execute(sql)
 | ||
|                     cl.fetchall()
 | ||
|                     end = time.time()
 | ||
|                     print("time cost :",end-start)
 | ||
|                 except Exception as e:
 | ||
|                     print('-'*40)
 | ||
|                     print(
 | ||
|                 "Failure thread%d, sql: %s \nexception: %s" %
 | ||
|                 (threadID, str(sql),str(e)))
 | ||
|                     err_uec='Unable to establish connection'
 | ||
|                     if err_uec in str(e) and loop >0:
 | ||
|                         exit(-1)
 | ||
|                 loop -= 1
 | ||
|                 if loop == 0: break
 | ||
|         fo.close()            
 | ||
|         cl.close()
 | ||
|         conn.close()       
 | ||
|         print("Thread %d: finishing" % threadID)
 | ||
| 
 | ||
|     def query_thread_nr(self,threadID):                      #使用原生python接口进行重放
 | ||
|         host = self.host
 | ||
|         user = self.user
 | ||
|         password = self.password
 | ||
|         conn = taos.connect(
 | ||
|             host,
 | ||
|             user,
 | ||
|             password,
 | ||
|             )
 | ||
|         cl = conn.cursor()
 | ||
|         cl.execute("use %s;" % self.dbname)
 | ||
|         replay_sql = []
 | ||
|         with  open('bak_sql_n_%d'%threadID,'r') as f:
 | ||
|             replay_sql = f.readlines()
 | ||
|         print("Replay Thread %d: starting" % threadID)
 | ||
|         for sql in replay_sql:
 | ||
|             try:
 | ||
|                 print("sql is ",sql)
 | ||
|                 start = time.time()
 | ||
|                 cl.execute(sql)
 | ||
|                 cl.fetchall()
 | ||
|                 end = time.time()
 | ||
|                 print("time cost :",end-start)
 | ||
|             except Exception as e:
 | ||
|                 print('-'*40)
 | ||
|                 print(
 | ||
|             "Failure thread%d, sql: %s \nexception: %s" %
 | ||
|             (threadID, str(sql),str(e)))
 | ||
|                 err_uec='Unable to establish connection'
 | ||
|                 if err_uec in str(e) and loop >0:
 | ||
|                     exit(-1)         
 | ||
|         cl.close()
 | ||
|         conn.close()       
 | ||
|         print("Replay Thread %d: finishing" % threadID)
 | ||
|           
 | ||
|     def query_thread_r(self,threadID):                      #使用rest接口查询
 | ||
|         print("Thread %d: starting" % threadID)
 | ||
|         fo = open('bak_sql_r_%d'%threadID,'w+')
 | ||
|         loop = self.loop
 | ||
|         while loop:
 | ||
|             try:
 | ||
|                 if self.random_pick():
 | ||
|                     if self.random_pick():
 | ||
|                         sql,temp=self.gen_query_sql()
 | ||
|                     else:
 | ||
|                         sql = self.gen_subquery_sql()
 | ||
|                 else:
 | ||
|                     sql = self.gen_query_join()
 | ||
|                 print("sql is ",sql)
 | ||
|                 fo.write(sql+'\n')
 | ||
|                 start = time.time()
 | ||
|                 self.rest_query(sql)
 | ||
|                 end = time.time()
 | ||
|                 print("time cost :",end-start)
 | ||
|             except Exception as e:
 | ||
|                 print('-'*40)
 | ||
|                 print(
 | ||
|             "Failure thread%d, sql: %s \nexception: %s" %
 | ||
|             (threadID, str(sql),str(e)))
 | ||
|                 err_uec='Unable to establish connection'
 | ||
|                 if err_uec in str(e) and loop >0:
 | ||
|                     exit(-1)
 | ||
|             loop -= 1    
 | ||
|             if loop == 0: break
 | ||
|         fo.close()      
 | ||
|         print("Thread %d: finishing" % threadID)   
 | ||
| 
 | ||
|     def query_thread_rr(self,threadID):                      #使用rest接口重放
 | ||
|         print("Replay Thread %d: starting" % threadID)
 | ||
|         replay_sql = []
 | ||
|         with  open('bak_sql_r_%d'%threadID,'r') as f:
 | ||
|             replay_sql = f.readlines()
 | ||
| 
 | ||
|         for sql in replay_sql:
 | ||
|             try:
 | ||
|                 print("sql is ",sql)
 | ||
|                 start = time.time()
 | ||
|                 self.rest_query(sql)
 | ||
|                 end = time.time()
 | ||
|                 print("time cost :",end-start)
 | ||
|             except Exception as e:
 | ||
|                 print('-'*40)
 | ||
|                 print(
 | ||
|             "Failure thread%d, sql: %s \nexception: %s" %
 | ||
|             (threadID, str(sql),str(e)))
 | ||
|                 err_uec='Unable to establish connection'
 | ||
|                 if err_uec in str(e) and loop >0:
 | ||
|                     exit(-1)  
 | ||
|         print("Replay Thread %d: finishing" % threadID)  
 | ||
| 
 | ||
|     def run(self):
 | ||
|         print(self.n_numOfTherads,self.r_numOfTherads)  
 | ||
|         threads = []
 | ||
|         if self.replay:  #whether replay 
 | ||
|             for i in range(self.n_numOfTherads):
 | ||
|                 thread = threading.Thread(target=self.query_thread_nr, args=(i,))
 | ||
|                 threads.append(thread)
 | ||
|                 thread.start()  
 | ||
|             for i in range(self.r_numOfTherads):
 | ||
|                 thread = threading.Thread(target=self.query_thread_rr, args=(i,))
 | ||
|                 threads.append(thread)
 | ||
|                 thread.start()
 | ||
|         else:
 | ||
|             for i in range(self.n_numOfTherads):
 | ||
|                 thread = threading.Thread(target=self.query_thread_n, args=(i,))
 | ||
|                 threads.append(thread)
 | ||
|                 thread.start()  
 | ||
|             for i in range(self.r_numOfTherads):
 | ||
|                 thread = threading.Thread(target=self.query_thread_r, args=(i,))
 | ||
|                 threads.append(thread)
 | ||
|                 thread.start()
 | ||
|  
 | ||
| parser = argparse.ArgumentParser()
 | ||
| parser.add_argument(
 | ||
|     '-H',
 | ||
|     '--host-name',
 | ||
|     action='store',
 | ||
|     default='127.0.0.1',
 | ||
|     type=str,
 | ||
|     help='host name to be connected (default: 127.0.0.1)')
 | ||
| parser.add_argument(
 | ||
|     '-S',
 | ||
|     '--ts',
 | ||
|     action='store',
 | ||
|     default=1500000000000,
 | ||
|     type=int,
 | ||
|     help='insert data from timestamp (default: 1500000000000)')
 | ||
| parser.add_argument(
 | ||
|     '-d',
 | ||
|     '--db-name',
 | ||
|     action='store',
 | ||
|     default='test',
 | ||
|     type=str,
 | ||
|     help='Database name to be created (default: test)')
 | ||
| parser.add_argument(
 | ||
|     '-t',
 | ||
|     '--number-of-native-threads',
 | ||
|     action='store',
 | ||
|     default=10,
 | ||
|     type=int,
 | ||
|     help='Number of native threads (default: 10)')
 | ||
| parser.add_argument(
 | ||
|     '-T',
 | ||
|     '--number-of-rest-threads',
 | ||
|     action='store',
 | ||
|     default=10,
 | ||
|     type=int,
 | ||
|     help='Number of rest threads (default: 10)')
 | ||
| parser.add_argument(
 | ||
|     '-r',
 | ||
|     '--number-of-records',
 | ||
|     action='store',
 | ||
|     default=100,
 | ||
|     type=int,
 | ||
|     help='Number of record to be created for each table  (default: 100)')
 | ||
| parser.add_argument(
 | ||
|     '-c',
 | ||
|     '--create-table',
 | ||
|     action='store',
 | ||
|     default='0',
 | ||
|     type=int,
 | ||
|     help='whether gen data (default: 0)')
 | ||
| parser.add_argument(
 | ||
|     '-p',
 | ||
|     '--subtb-name-prefix',
 | ||
|     action='store',
 | ||
|     default='t',
 | ||
|     type=str,
 | ||
|     help='subtable-name-prefix (default: t)')
 | ||
| parser.add_argument(
 | ||
|     '-P',
 | ||
|     '--stb-name-prefix',
 | ||
|     action='store',
 | ||
|     default='st',
 | ||
|     type=str,
 | ||
|     help='stable-name-prefix (default: st)')
 | ||
| parser.add_argument(
 | ||
|     '-b',
 | ||
|     '--probabilities',
 | ||
|     action='store',
 | ||
|     default='0.05',
 | ||
|     type=float,
 | ||
|     help='probabilities of join (default: 0.05)')
 | ||
| parser.add_argument(
 | ||
|     '-l',
 | ||
|     '--loop-per-thread',
 | ||
|     action='store',
 | ||
|     default='100',
 | ||
|     type=int,
 | ||
|     help='loop per thread (default: 100)')
 | ||
| parser.add_argument(
 | ||
|     '-u',
 | ||
|     '--user',
 | ||
|     action='store', 
 | ||
|     default='root',
 | ||
|     type=str,
 | ||
|     help='user name')
 | ||
| parser.add_argument(
 | ||
|     '-w',
 | ||
|     '--password',
 | ||
|     action='store', 
 | ||
|     default='root',
 | ||
|     type=str,
 | ||
|     help='user name')
 | ||
| parser.add_argument(
 | ||
|     '-n',
 | ||
|     '--number-of-tables',
 | ||
|     action='store',
 | ||
|     default=1000,
 | ||
|     type=int,
 | ||
|     help='Number of subtales per stable (default: 1000)')
 | ||
| parser.add_argument(
 | ||
|     '-N',
 | ||
|     '--number-of-stables',
 | ||
|     action='store',
 | ||
|     default=2,
 | ||
|     type=int,
 | ||
|     help='Number of stables  (default: 2)')
 | ||
| parser.add_argument(
 | ||
|     '-m',
 | ||
|     '--mix-stable-subtable',
 | ||
|     action='store',
 | ||
|     default=0,
 | ||
|     type=int,
 | ||
|     help='0:stable & substable ,1:subtable ,2:stable (default: 0)')
 | ||
| parser.add_argument(
 | ||
|     '-R',
 | ||
|     '--replay',
 | ||
|     action='store',
 | ||
|     default=0,
 | ||
|     type=int,
 | ||
|     help='0:not replay ,1:replay  (default: 0)')
 | ||
| 
 | ||
| args = parser.parse_args()
 | ||
| q = ConcurrentInquiry(
 | ||
|     args.ts,args.host_name,args.user,args.password,args.db_name,
 | ||
|                 args.stb_name_prefix,args.subtb_name_prefix,args.number_of_native_threads,args.number_of_rest_threads,
 | ||
|                 args.probabilities,args.loop_per_thread,args.number_of_stables,args.number_of_tables ,args.number_of_records,
 | ||
|                 args.mix_stable_subtable, args.replay )
 | ||
| 
 | ||
| if args.create_table: 
 | ||
|     q.gen_data()
 | ||
| q.get_full()
 | ||
| 
 | ||
| #q.gen_query_sql()
 | ||
| q.run()
 | ||
| 
 |