add multi thread

This commit is contained in:
jiajingbin 2021-07-24 19:39:20 +08:00
parent 8ee0962538
commit 63c9b2069e
1 changed files with 158 additions and 24 deletions

View File

@ -245,8 +245,12 @@ class TDTestCase:
sql_seq = f'{stb_name},{id}=\"{tb_name}\",t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6},t7={t7},t8={t8},t11={t1},t10={t8} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6},c7={c7},c8={c8},c9={c9},c11={c8},c10={t0} {ts}'
if ct_am_tag is not None:
sql_seq = f'{stb_name},{id}=\"{tb_name}\",t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6},c7={c7},c8={c8},c9={c9},c11={c8},c10={t0} {ts}'
if id_noexist_tag is not None:
sql_seq = f'{stb_name},t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6},c7={c7},c8={c8},c9={c9},c11={c8},c10={t0} {ts}'
if ct_ma_tag is not None:
sql_seq = f'{stb_name},{id}=\"{tb_name}\",t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6},t7={t7},t8={t8},t11={t1},t10={t8} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6} {ts}'
if id_noexist_tag is not None:
sql_seq = f'{stb_name},t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6},t7={t7},t8={t8},t11={t1},t10={t8} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6} {ts}'
if ct_min_tag is not None:
sql_seq = f'{stb_name},{id}=\"{tb_name}\",t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6} {ts}'
return sql_seq, stb_name, tb_name
@ -326,15 +330,16 @@ class TDTestCase:
def cleanStb(self):
query_sql = "show stables"
res_row_list = self.resHandle(query_sql, True)[0]
print(res_row_list)
for stb in res_row_list:
res_row_list = tdSql.query(query_sql, True)
stb_list = map(lambda x: x[0], res_row_list)
for stb in stb_list:
tdSql.execute(f'drop table if exists {stb}')
def initCheckCase(self):
"""
normal tags and cols, one for every elm
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql()
self.resCmp(input_sql, stb_name)
@ -342,6 +347,7 @@ class TDTestCase:
"""
check all normal type
"""
self.cleanStb()
full_type_list = ["f", "F", "false", "False", "t", "T", "true", "True"]
for t_type in full_type_list:
input_sql, stb_name, tb_name = self.genFullTypeSql(c0=t_type, t0=t_type)
@ -355,6 +361,7 @@ class TDTestCase:
please test :
binary_symbols = '\"abcd`~!@#$%^&*()_-{[}]|:;<.>?lfjal"\'\'"\"'
'''
self.cleanStb()
binary_symbols = '\"abcd`~!@#$%^&*()_-{[}]|:;<.>?lfjal"\"'
nchar_symbols = f'L{binary_symbols}'
input_sql, stb_name, tb_name = self.genFullTypeSql(c7=binary_symbols, c8=nchar_symbols, t7=binary_symbols, t8=nchar_symbols)
@ -365,6 +372,7 @@ class TDTestCase:
test ts list --> ["1626006833639000000ns", "1626006833639019us", "1626006833640ms", "1626006834s", "1626006822639022"]
# ! us级时间戳都为0时数据库中查询显示但python接口拿到的结果不显示 .000000的情况请确认,目前修改时间处理代码可以通过
"""
self.cleanStb()
ts_list = ["1626006833639000000ns", "1626006833639019us", "1626006833640ms", "1626006834s", "1626006822639022", 0]
for ts in ts_list:
input_sql, stb_name, tb_name = self.genFullTypeSql(ts=ts)
@ -375,6 +383,7 @@ class TDTestCase:
check id.index in tags
eg: t0=**,id=**,t1=**
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql(id_change_tag=True)
self.resCmp(input_sql, stb_name)
@ -383,6 +392,7 @@ class TDTestCase:
check id param
eg: id and ID
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql(id_upper_tag=True)
self.resCmp(input_sql, stb_name)
input_sql, stb_name, tb_name = self.genFullTypeSql(id_change_tag=True, id_upper_tag=True)
@ -392,6 +402,7 @@ class TDTestCase:
"""
id not exist
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql(id_noexist_tag=True)
self.resCmp(input_sql, stb_name)
query_sql = f"select tbname from {stb_name}"
@ -408,6 +419,7 @@ class TDTestCase:
max tag count is 128
max col count is ??
"""
self.cleanStb()
input_sql, stb_name = self.genLongSql(128, 4000)
print(input_sql)
code = self._conn.insertLines([input_sql])
@ -425,6 +437,7 @@ class TDTestCase:
"""
test illegal id name
"""
self.cleanStb()
rstr = list("!@#$%^&*()-+={}|[]\:<>?")
for i in rstr:
input_sql = self.genFullTypeSql(tb_name=f"\"aaa{i}bbb\"")[0]
@ -435,6 +448,7 @@ class TDTestCase:
"""
id is start with num
"""
self.cleanStb()
input_sql = self.genFullTypeSql(tb_name=f"\"1aaabbb\"")[0]
code = self._conn.insertLines([input_sql])
tdSql.checkNotEqual(code, 0)
@ -443,6 +457,7 @@ class TDTestCase:
"""
check now unsupported
"""
self.cleanStb()
input_sql = self.genFullTypeSql(ts="now")[0]
code = self._conn.insertLines([input_sql])
tdSql.checkNotEqual(code, 0)
@ -451,6 +466,7 @@ class TDTestCase:
"""
check date format ts unsupported
"""
self.cleanStb()
input_sql = self.genFullTypeSql(ts="2021-07-21\ 19:01:46.920")[0]
code = self._conn.insertLines([input_sql])
tdSql.checkNotEqual(code, 0)
@ -459,6 +475,7 @@ class TDTestCase:
"""
check ts format like 16260068336390us19
"""
self.cleanStb()
input_sql = self.genFullTypeSql(ts="16260068336390us19")[0]
code = self._conn.insertLines([input_sql])
tdSql.checkNotEqual(code, 0)
@ -467,6 +484,7 @@ class TDTestCase:
"""
check full type tag value limit
"""
self.cleanStb()
# i8
for t1 in ["-127i8", "127i8"]:
input_sql, stb_name, tb_name = self.genFullTypeSql(t1=t1)
@ -545,6 +563,7 @@ class TDTestCase:
"""
check full type col value limit
"""
self.cleanStb()
# i8
for c1 in ["-127i8", "127i8"]:
input_sql, stb_name, tb_name = self.genFullTypeSql(c1=c1)
@ -628,6 +647,7 @@ class TDTestCase:
"""
test illegal tag col value
"""
self.cleanStb()
# bool
for i in ["TrUe", "tRue", "trUe", "truE", "FalsE", "fAlse", "faLse", "falSe", "falsE"]:
input_sql1 = self.genFullTypeSql(t0=i)[0]
@ -661,6 +681,7 @@ class TDTestCase:
"""
check duplicate Id Tag Col
"""
self.cleanStb()
input_sql_id = self.genFullTypeSql(id_double_tag=True)[0]
code = self._conn.insertLines([input_sql_id])
tdSql.checkNotEqual(code, 0)
@ -687,6 +708,7 @@ class TDTestCase:
"""
case no id when stb exist
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql(t0="f", c0="f")
self.resCmp(input_sql, stb_name)
input_sql, stb_name, tb_name = self.genFullTypeSql(stb_name=stb_name, id_noexist_tag=True, t0="f", c0="f")
@ -699,6 +721,7 @@ class TDTestCase:
"""
check duplicate insert when stb exist
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql()
self.resCmp(input_sql, stb_name)
code = self._conn.insertLines([input_sql])
@ -709,6 +732,7 @@ class TDTestCase:
"""
check length increase
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql()
self.resCmp(input_sql, stb_name)
tb_name = self.getLongName(5, "letters")
@ -721,6 +745,7 @@ class TDTestCase:
"""
check column and tag count add, stb and tb duplicate
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql(t0="f", c0="f")
print(input_sql)
self.resCmp(input_sql, stb_name)
@ -732,6 +757,7 @@ class TDTestCase:
"""
check column and tag count add
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql(t0="f", c0="f")
self.resCmp(input_sql, stb_name)
input_sql, stb_name, tb_name_1 = self.genFullTypeSql(stb_name=stb_name, tb_name=f'{tb_name}_1', t0="f", c0="f", ct_add_tag=True)
@ -745,6 +771,7 @@ class TDTestCase:
condition: stb not change
insert two table, keep tag unchange, change col
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql(t0="f", c0="f", id_noexist_tag=True)
self.resCmp(input_sql, stb_name)
tb_name1 = self.getNoIdTbName(stb_name)
@ -767,6 +794,7 @@ class TDTestCase:
# ? case finish , src bug exist
every binary and nchar must be length+2, so
"""
self.cleanStb()
stb_name = self.getLongName(7, "letters")
tb_name = f'{stb_name}_1'
input_sql = f'{stb_name},id="{tb_name}",t0=t c0=f 1626006833639000000ns'
@ -792,6 +820,10 @@ class TDTestCase:
# ? tag nchar max is 16384, col+ts nchar max 49151
def tagColNcharMaxLengthCheckCase(self):
"""
# ? case finish , src bug exist
"""
self.cleanStb()
stb_name = self.getLongName(7, "letters")
tb_name = f'{stb_name}_1'
input_sql = f'{stb_name},id="{tb_name}",t0=t c0=f 1626006833639000000ns'
@ -819,6 +851,7 @@ class TDTestCase:
"""
test batch insert
"""
self.cleanStb()
stb_name = self.getLongName(8, "letters")
tdSql.execute(f'create stable {stb_name}(ts timestamp, f int) tags(t1 bigint)')
lines = ["st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
@ -839,6 +872,7 @@ class TDTestCase:
"""
test batch error insert
"""
self.cleanStb()
stb_name = self.getLongName(8, "letters")
lines = ["st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
f"{stb_name},t2=5f64,t3=L\"ste\" c1=tRue,c2=4i64,c3=\"iam\" 1626056811823316532ns"]
@ -861,12 +895,22 @@ class TDTestCase:
s_stb_s_tb_list = list()
s_stb_s_tb_a_col_a_tag_list = list()
s_stb_s_tb_m_col_m_tag_list = list()
s_stb_d_tb_list = list()
s_stb_d_tb_a_col_m_tag_list = list()
s_stb_d_tb_a_tag_m_col_list = list()
s_stb_s_tb_d_ts_list = list()
for i in range(count):
d_stb_d_tb_list.append(self.genFullTypeSql(t0="f", c0="f"))
s_stb_s_tb_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'{self.getLongName(8, "letters")}"'))
s_stb_s_tb_a_col_a_tag_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'{self.getLongName(8, "letters")}"', ct_add_tag=True))
s_stb_s_tb_m_col_m_tag_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'{self.getLongName(8, "letters")}"', ct_min_tag=True))
return d_stb_d_tb_list, s_stb_s_tb_list, s_stb_s_tb_a_col_a_tag_list, s_stb_s_tb_m_col_m_tag_list
s_stb_s_tb_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"'))
s_stb_s_tb_a_col_a_tag_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', ct_add_tag=True))
s_stb_s_tb_m_col_m_tag_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', ct_min_tag=True))
s_stb_d_tb_list.append(self.genFullTypeSql(stb_name=stb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', id_noexist_tag=True))
s_stb_d_tb_a_col_m_tag_list.append(self.genFullTypeSql(stb_name=stb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', id_noexist_tag=True, ct_am_tag=True))
s_stb_d_tb_a_tag_m_col_list.append(self.genFullTypeSql(stb_name=stb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', id_noexist_tag=True, ct_ma_tag=True))
s_stb_s_tb_d_ts_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', ts=0))
return d_stb_d_tb_list, s_stb_s_tb_list, s_stb_s_tb_a_col_a_tag_list, s_stb_s_tb_m_col_m_tag_list, s_stb_d_tb_list, s_stb_d_tb_a_col_m_tag_list, s_stb_d_tb_a_tag_m_col_list, s_stb_s_tb_d_ts_list
def genMultiThreadSeq(self, sql_list):
tlist = list()
@ -885,6 +929,7 @@ class TDTestCase:
"""
thread input different stb
"""
self.cleanStb()
input_sql = self.genSqlList()[0]
self.multiThreadRun(self.genMultiThreadSeq(input_sql))
tdSql.query(f"show tables;")
@ -894,6 +939,7 @@ class TDTestCase:
"""
thread input same stb tb, different data, result keep first data
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql(tb_name=self.getLongName(10, "letters"))
self.resCmp(input_sql, stb_name)
s_stb_s_tb_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[1]
@ -902,35 +948,110 @@ class TDTestCase:
tdSql.checkRows(1)
expected_tb_name = self.getNoIdTbName(stb_name)[0]
tdSql.checkEqual(tb_name, expected_tb_name)
tdSql.query(f"select * from {stb_name};")
tdSql.checkRows(1)
def sStbStbDdataAtcInsertMultiThreadCheckCase(self):
"""
thread input same stb tb, different data, add columes and tags, result keep first data
"""
input_sql, stb_name, tb_name = self.genFullTypeSql(tb_name=self.getLongName(10, "letters"))
self.resCmp(input_sql, stb_name)
s_stb_s_tb_a_col_a_tag_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[2]
print(s_stb_s_tb_a_col_a_tag_list)
self.multiThreadRun(self.genMultiThreadSeq(s_stb_s_tb_a_col_a_tag_list))
tdSql.query(f"show tables;")
tdSql.checkRows(1)
expected_tb_name = self.getNoIdTbName(stb_name)[0]
tdSql.checkEqual(tb_name, expected_tb_name)
def sStbStbDdataMtcInsertMultiThreadCheckCase(self):
"""
thread input same stb tb, different data, add columes and tags, result keep first data
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql(tb_name=self.getLongName(10, "letters"))
self.resCmp(input_sql, stb_name)
s_stb_s_tb_a_col_a_tag_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[2]
self.multiThreadRun(self.genMultiThreadSeq(s_stb_s_tb_a_col_a_tag_list))
tdSql.query(f"show tables;")
tdSql.checkRows(1)
expected_tb_name = self.getNoIdTbName(stb_name)[0]
tdSql.checkEqual(tb_name, expected_tb_name)
tdSql.query(f"select * from {stb_name};")
tdSql.checkRows(1)
def sStbStbDdataMtcInsertMultiThreadCheckCase(self):
"""
thread input same stb tb, different data, minus columes and tags, result keep first data
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql(tb_name=self.getLongName(10, "letters"))
self.resCmp(input_sql, stb_name)
s_stb_s_tb_m_col_m_tag_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[3]
print(s_stb_s_tb_m_col_m_tag_list)
self.multiThreadRun(self.genMultiThreadSeq(s_stb_s_tb_m_col_m_tag_list))
tdSql.query(f"show tables;")
tdSql.checkRows(1)
expected_tb_name = self.getNoIdTbName(stb_name)[0]
tdSql.checkEqual(tb_name, expected_tb_name)
tdSql.query(f"select * from {stb_name};")
tdSql.checkRows(1)
def sStbDtbDdataInsertMultiThreadCheckCase(self):
"""
thread input same stb, different tb, different data
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql()
self.resCmp(input_sql, stb_name)
s_stb_d_tb_list = self.genSqlList(stb_name=stb_name)[4]
self.multiThreadRun(self.genMultiThreadSeq(s_stb_d_tb_list))
tdSql.query(f"show tables;")
tdSql.checkRows(6)
def sStbDtbDdataAcMtInsertMultiThreadCheckCase(self):
"""
#! concurrency conflict
"""
"""
thread input same stb, different tb, different data, add col, mul tag
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql()
self.resCmp(input_sql, stb_name)
s_stb_d_tb_a_col_m_tag_list = self.genSqlList(stb_name=stb_name)[5]
self.multiThreadRun(self.genMultiThreadSeq(s_stb_d_tb_a_col_m_tag_list))
tdSql.query(f"show tables;")
tdSql.checkRows(6)
def sStbDtbDdataAtMcInsertMultiThreadCheckCase(self):
"""
#! concurrency conflict
"""
"""
thread input same stb, different tb, different data, add tag, mul col
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql()
self.resCmp(input_sql, stb_name)
s_stb_d_tb_a_tag_m_col_list = self.genSqlList(stb_name=stb_name)[6]
self.multiThreadRun(self.genMultiThreadSeq(s_stb_d_tb_a_tag_m_col_list))
tdSql.query(f"show tables;")
tdSql.checkRows(6)
def sStbStbDdataDtsInsertMultiThreadCheckCase(self):
"""
thread input same stb tb, different ts
"""
self.cleanStb()
input_sql, stb_name, tb_name = self.genFullTypeSql(tb_name=self.getLongName(10, "letters"))
self.resCmp(input_sql, stb_name)
s_stb_s_tb_d_ts_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[7]
print(s_stb_s_tb_d_ts_list)
self.multiThreadRun(self.genMultiThreadSeq(s_stb_s_tb_d_ts_list))
tdSql.query(f"show tables;")
tdSql.checkRows(1)
tdSql.query(f"select * from {stb_name}")
tdSql.checkRows(6)
def test(self):
input_sql1 = "rfasta,id=\"rfasta_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"ddzhiksj\",t8=L\"ncharTagValue\" c0=True,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"bnhwlgvj\",c8=L\"ncharTagValue\",c9=7u64 1626006833639000000ns"
input_sql2 = "rfasta,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"ddzhiksj\",t8=L\"ncharTagValue\" c0=True,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"bnhwlgvj\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000ns"
input_sql3 = 'hmemeb,id="kilrcrldgf",t0=True,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="ndsfdrum",t8=L"ncharTagValue" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="igwoehkm",c8=L"ncharColValue",c9=7u64 0'
input_sql4 = 'hmemeb,id="kilrcrldgf",t0=F,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="fysodjql",t8=L"ncharTagValue" c0=True,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="waszbfvc",c8=L"ncharColValue",c9=7u64 0'
self._conn.insertLines([input_sql3])
self._conn.insertLines([input_sql4])
def run(self):
print("running {}".format(__file__))
@ -982,10 +1103,23 @@ class TDTestCase:
# ! bug
# self.batchErrorInsertCheckCase()
self.stbInsertMultiThreadCheckCase()
# self.stbInsertMultiThreadCheckCase()
# self.sStbStbDdataInsertMultiThreadCheckCase()
# self.sStbStbDdataAtcInsertMultiThreadCheckCase()
self.sStbStbDdataMtcInsertMultiThreadCheckCase()
# self.sStbStbDdataMtcInsertMultiThreadCheckCase()
# self.sStbDtbDdataInsertMultiThreadCheckCase()
# ! concurrency conflict
# self.sStbDtbDdataAcMtInsertMultiThreadCheckCase()
# self.sStbDtbDdataAtMcInsertMultiThreadCheckCase()
# ! concurrency conflict
# self.sStbStbDdataDtsInsertMultiThreadCheckCase()
# self.test()