fill add test
This commit is contained in:
parent
e1e5a7780c
commit
0e868735be
|
@ -1,8 +1,10 @@
|
||||||
import random
|
import random
|
||||||
|
from fabric2.runners import threading
|
||||||
from pandas._libs import interval
|
from pandas._libs import interval
|
||||||
import taos
|
import taos
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
from util.common import TDCom
|
||||||
from util.log import *
|
from util.log import *
|
||||||
from util.sql import *
|
from util.sql import *
|
||||||
from util.cases import *
|
from util.cases import *
|
||||||
|
@ -26,16 +28,20 @@ class TDTestCase:
|
||||||
ret.append((i,j))
|
ret.append((i,j))
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def check_fill_range(self, where_start, where_end, res, sql: str):
|
def check_fill_range(self, where_start, where_end, res_asc, res_desc, sql: str, interval):
|
||||||
if len(res) == 0:
|
if len(res_asc) != len(res_desc):
|
||||||
tdLog.debug(f'fill sql got no rows {sql}')
|
tdLog.exit(f"err, asc desc with different rows, asc: {len(res_asc)}, desc: {len(res_desc)} sql: {sql}")
|
||||||
|
if len(res_asc) == 0:
|
||||||
|
tdLog.info(f'from {where_start} to {where_end} no rows returned')
|
||||||
return
|
return
|
||||||
if len(res) == 1:
|
asc_first = res_asc[0]
|
||||||
tdLog.debug(f'fill sql got one row {sql}: {res}')
|
asc_last = res_asc[-1]
|
||||||
|
desc_first = res_desc[0]
|
||||||
|
desc_last = res_desc[-1]
|
||||||
|
if asc_first[0] != desc_last[0] or asc_last[0] != desc_first[0]:
|
||||||
|
tdLog.exit(f'fill sql different row data {sql}: asc<{asc_first[0].timestamp()}, {asc_last[0].timestamp()}>, desc<{desc_last[0].timestamp()}, {desc_first[0].timestamp()}>')
|
||||||
else:
|
else:
|
||||||
first = res[0]
|
tdLog.info(f'from {where_start} to {where_end} same time returned asc<{asc_first[0].timestamp()}, {asc_last[0].timestamp()}>, desc<{desc_last[0].timestamp()}, {desc_first[0].timestamp()}> interval: {interval}')
|
||||||
last = res[-1]
|
|
||||||
tdLog.debug(f'fill sql got rows {sql}: {res}')
|
|
||||||
|
|
||||||
def generate_partition_by(self):
|
def generate_partition_by(self):
|
||||||
val = random.random()
|
val = random.random()
|
||||||
|
@ -48,36 +54,58 @@ class TDTestCase:
|
||||||
|
|
||||||
def generate_fill_interval(self)-> list[tuple]:
|
def generate_fill_interval(self)-> list[tuple]:
|
||||||
ret = []
|
ret = []
|
||||||
intervals = [1, 30, 60, 90, 120, 300, 3600]
|
intervals = [60, 90, 120, 300, 3600]
|
||||||
for i in range(0, len(intervals)):
|
for i in range(0, len(intervals)):
|
||||||
for j in range(0, i+1):
|
for j in range(0, i+1):
|
||||||
ret.append((intervals[i], intervals[j]))
|
ret.append((intervals[i], intervals[j]))
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def generate_fill_sql(self, where_start, where_end, fill_interval: tuple) -> str:
|
def generate_fill_sql(self, where_start, where_end, fill_interval: tuple):
|
||||||
partition_by = self.generate_partition_by()
|
partition_by = self.generate_partition_by()
|
||||||
where = f'ts >= {where_start} and ts < {where_end}'
|
where = f'where ts >= {where_start} and ts < {where_end}'
|
||||||
return f'select _wstart, _wend, count(*) from meters {where} {partition_by} interval({fill_interval[0]}s) sliding({fill_interval[1]}s) fill(NULL)'
|
sql = f'select _wstart, _wend, count(*) from test.meters {where} {partition_by} interval({fill_interval[0]}s) sliding({fill_interval[1]}s) fill(NULL)'
|
||||||
|
sql_asc = sql + " order by _wstart asc"
|
||||||
|
sql_desc = sql + " order by _wstart desc"
|
||||||
|
return sql_asc, sql_desc
|
||||||
|
|
||||||
|
def fill_test_thread_routine(self, cli: TDSql, interval, data_start, data_end, step):
|
||||||
|
ranges = self.generate_fill_range(data_start, data_end, interval[0], step)
|
||||||
|
for range in ranges:
|
||||||
|
sql_asc, sql_desc = self.generate_fill_sql(range[0], range[1], interval)
|
||||||
|
cli.query(sql_asc, queryTimes=1)
|
||||||
|
asc_res = cli.queryResult
|
||||||
|
cli.query(sql_desc, queryTimes=1)
|
||||||
|
desc_res = cli.queryResult
|
||||||
|
self.check_fill_range(range[0], range[1], asc_res,desc_res , sql_asc, interval)
|
||||||
|
|
||||||
def test_fill_range(self):
|
def test_fill_range(self):
|
||||||
os.system('taosBenchmark -t 1000 -n 1000 -v 2 -S 32000 -y')
|
os.system('taosBenchmark -t 10 -n 10000 -v 8 -S 32000 -y')
|
||||||
data_start = 1500000000
|
data_start = 1500000000000
|
||||||
data_end = 1500031968
|
data_end = 1500319968000
|
||||||
step = 100
|
step = 2000000
|
||||||
|
tdCom = TDCom()
|
||||||
|
inses: list[TDSql] = []
|
||||||
|
threads: list[threading.Thread] = []
|
||||||
|
|
||||||
fill_intervals: list[tuple] = self.generate_fill_interval()
|
fill_intervals: list[tuple] = self.generate_fill_interval()
|
||||||
fill_interval = random.choice(fill_intervals)
|
for interval in fill_intervals:
|
||||||
ranges = self.generate_fill_range(data_start, data_end, fill_interval[0], step)
|
ins = tdCom.newTdSql()
|
||||||
range = random.choice(ranges)
|
t = threading.Thread(target=self.fill_test_thread_routine, args=(ins, interval, data_start, data_end, step))
|
||||||
sql = self.generate_fill_sql(range[0], range[1], fill_interval)
|
t.start()
|
||||||
tdSql.query(sql)
|
inses.append(ins)
|
||||||
res = tdSql.queryResult
|
threads.append(t)
|
||||||
self.check_fill_range(range[0], range[1], res, sql)
|
|
||||||
|
for t in threads:
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
for ins in inses:
|
||||||
|
ins.close()
|
||||||
|
|
||||||
## tdSql.execute('drop database test')
|
## tdSql.execute('drop database test')
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.test_fill_range()
|
self.test_fill_range()
|
||||||
|
return
|
||||||
dbname = "db"
|
dbname = "db"
|
||||||
tbname = "tb"
|
tbname = "tb"
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue