From 63c9b2069e6755fff9ae7cec9e7a132af7be8f40 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Sat, 24 Jul 2021 19:39:20 +0800 Subject: [PATCH] add multi thread --- tests/pytest/insert/schemalessInsert.py | 182 ++++++++++++++++++++---- 1 file changed, 158 insertions(+), 24 deletions(-) diff --git a/tests/pytest/insert/schemalessInsert.py b/tests/pytest/insert/schemalessInsert.py index b6b8f57829..a46aa40f0f 100644 --- a/tests/pytest/insert/schemalessInsert.py +++ b/tests/pytest/insert/schemalessInsert.py @@ -245,8 +245,12 @@ class TDTestCase: sql_seq = f'{stb_name},{id}=\"{tb_name}\",t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6},t7={t7},t8={t8},t11={t1},t10={t8} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6},c7={c7},c8={c8},c9={c9},c11={c8},c10={t0} {ts}' if ct_am_tag is not None: sql_seq = f'{stb_name},{id}=\"{tb_name}\",t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6},c7={c7},c8={c8},c9={c9},c11={c8},c10={t0} {ts}' + if id_noexist_tag is not None: + sql_seq = f'{stb_name},t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6},c7={c7},c8={c8},c9={c9},c11={c8},c10={t0} {ts}' if ct_ma_tag is not None: sql_seq = f'{stb_name},{id}=\"{tb_name}\",t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6},t7={t7},t8={t8},t11={t1},t10={t8} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6} {ts}' + if id_noexist_tag is not None: + sql_seq = f'{stb_name},t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6},t7={t7},t8={t8},t11={t1},t10={t8} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6} {ts}' if ct_min_tag is not None: sql_seq = f'{stb_name},{id}=\"{tb_name}\",t0={t0},t1={t1},t2={t2},t3={t3},t4={t4},t5={t5},t6={t6} c0={c0},c1={c1},c2={c2},c3={c3},c4={c4},c5={c5},c6={c6} {ts}' return sql_seq, stb_name, tb_name @@ -326,15 +330,16 @@ class TDTestCase: def cleanStb(self): query_sql = "show stables" - res_row_list = self.resHandle(query_sql, True)[0] - print(res_row_list) - for stb in res_row_list: + res_row_list = tdSql.query(query_sql, True) + stb_list = map(lambda x: x[0], res_row_list) + for stb in stb_list: tdSql.execute(f'drop table if exists {stb}') def initCheckCase(self): """ normal tags and cols, one for every elm """ + self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql() self.resCmp(input_sql, stb_name) @@ -342,6 +347,7 @@ class TDTestCase: """ check all normal type """ + self.cleanStb() full_type_list = ["f", "F", "false", "False", "t", "T", "true", "True"] for t_type in full_type_list: input_sql, stb_name, tb_name = self.genFullTypeSql(c0=t_type, t0=t_type) @@ -355,6 +361,7 @@ class TDTestCase: please test : binary_symbols = '\"abcd`~!@#$%^&*()_-{[}]|:;<.>?lfjal"\'\'"\"' ''' + self.cleanStb() binary_symbols = '\"abcd`~!@#$%^&*()_-{[}]|:;<.>?lfjal"\"' nchar_symbols = f'L{binary_symbols}' input_sql, stb_name, tb_name = self.genFullTypeSql(c7=binary_symbols, c8=nchar_symbols, t7=binary_symbols, t8=nchar_symbols) @@ -365,6 +372,7 @@ class TDTestCase: test ts list --> ["1626006833639000000ns", "1626006833639019us", "1626006833640ms", "1626006834s", "1626006822639022"] # ! us级时间戳都为0时,数据库中查询显示,但python接口拿到的结果不显示 .000000的情况请确认,目前修改时间处理代码可以通过 """ + self.cleanStb() ts_list = ["1626006833639000000ns", "1626006833639019us", "1626006833640ms", "1626006834s", "1626006822639022", 0] for ts in ts_list: input_sql, stb_name, tb_name = self.genFullTypeSql(ts=ts) @@ -375,6 +383,7 @@ class TDTestCase: check id.index in tags eg: t0=**,id=**,t1=** """ + self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql(id_change_tag=True) self.resCmp(input_sql, stb_name) @@ -383,6 +392,7 @@ class TDTestCase: check id param eg: id and ID """ + self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql(id_upper_tag=True) self.resCmp(input_sql, stb_name) input_sql, stb_name, tb_name = self.genFullTypeSql(id_change_tag=True, id_upper_tag=True) @@ -392,6 +402,7 @@ class TDTestCase: """ id not exist """ + self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql(id_noexist_tag=True) self.resCmp(input_sql, stb_name) query_sql = f"select tbname from {stb_name}" @@ -408,6 +419,7 @@ class TDTestCase: max tag count is 128 max col count is ?? """ + self.cleanStb() input_sql, stb_name = self.genLongSql(128, 4000) print(input_sql) code = self._conn.insertLines([input_sql]) @@ -425,6 +437,7 @@ class TDTestCase: """ test illegal id name """ + self.cleanStb() rstr = list("!@#$%^&*()-+={}|[]\:<>?") for i in rstr: input_sql = self.genFullTypeSql(tb_name=f"\"aaa{i}bbb\"")[0] @@ -435,6 +448,7 @@ class TDTestCase: """ id is start with num """ + self.cleanStb() input_sql = self.genFullTypeSql(tb_name=f"\"1aaabbb\"")[0] code = self._conn.insertLines([input_sql]) tdSql.checkNotEqual(code, 0) @@ -443,6 +457,7 @@ class TDTestCase: """ check now unsupported """ + self.cleanStb() input_sql = self.genFullTypeSql(ts="now")[0] code = self._conn.insertLines([input_sql]) tdSql.checkNotEqual(code, 0) @@ -451,6 +466,7 @@ class TDTestCase: """ check date format ts unsupported """ + self.cleanStb() input_sql = self.genFullTypeSql(ts="2021-07-21\ 19:01:46.920")[0] code = self._conn.insertLines([input_sql]) tdSql.checkNotEqual(code, 0) @@ -459,6 +475,7 @@ class TDTestCase: """ check ts format like 16260068336390us19 """ + self.cleanStb() input_sql = self.genFullTypeSql(ts="16260068336390us19")[0] code = self._conn.insertLines([input_sql]) tdSql.checkNotEqual(code, 0) @@ -467,6 +484,7 @@ class TDTestCase: """ check full type tag value limit """ + self.cleanStb() # i8 for t1 in ["-127i8", "127i8"]: input_sql, stb_name, tb_name = self.genFullTypeSql(t1=t1) @@ -545,6 +563,7 @@ class TDTestCase: """ check full type col value limit """ + self.cleanStb() # i8 for c1 in ["-127i8", "127i8"]: input_sql, stb_name, tb_name = self.genFullTypeSql(c1=c1) @@ -628,6 +647,7 @@ class TDTestCase: """ test illegal tag col value """ + self.cleanStb() # bool for i in ["TrUe", "tRue", "trUe", "truE", "FalsE", "fAlse", "faLse", "falSe", "falsE"]: input_sql1 = self.genFullTypeSql(t0=i)[0] @@ -661,6 +681,7 @@ class TDTestCase: """ check duplicate Id Tag Col """ + self.cleanStb() input_sql_id = self.genFullTypeSql(id_double_tag=True)[0] code = self._conn.insertLines([input_sql_id]) tdSql.checkNotEqual(code, 0) @@ -687,6 +708,7 @@ class TDTestCase: """ case no id when stb exist """ + self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql(t0="f", c0="f") self.resCmp(input_sql, stb_name) input_sql, stb_name, tb_name = self.genFullTypeSql(stb_name=stb_name, id_noexist_tag=True, t0="f", c0="f") @@ -699,6 +721,7 @@ class TDTestCase: """ check duplicate insert when stb exist """ + self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql() self.resCmp(input_sql, stb_name) code = self._conn.insertLines([input_sql]) @@ -709,6 +732,7 @@ class TDTestCase: """ check length increase """ + self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql() self.resCmp(input_sql, stb_name) tb_name = self.getLongName(5, "letters") @@ -721,6 +745,7 @@ class TDTestCase: """ check column and tag count add, stb and tb duplicate """ + self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql(t0="f", c0="f") print(input_sql) self.resCmp(input_sql, stb_name) @@ -732,6 +757,7 @@ class TDTestCase: """ check column and tag count add """ + self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql(t0="f", c0="f") self.resCmp(input_sql, stb_name) input_sql, stb_name, tb_name_1 = self.genFullTypeSql(stb_name=stb_name, tb_name=f'{tb_name}_1', t0="f", c0="f", ct_add_tag=True) @@ -745,6 +771,7 @@ class TDTestCase: condition: stb not change insert two table, keep tag unchange, change col """ + self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql(t0="f", c0="f", id_noexist_tag=True) self.resCmp(input_sql, stb_name) tb_name1 = self.getNoIdTbName(stb_name) @@ -767,6 +794,7 @@ class TDTestCase: # ? case finish , src bug exist every binary and nchar must be length+2, so """ + self.cleanStb() stb_name = self.getLongName(7, "letters") tb_name = f'{stb_name}_1' input_sql = f'{stb_name},id="{tb_name}",t0=t c0=f 1626006833639000000ns' @@ -792,6 +820,10 @@ class TDTestCase: # ? tag nchar max is 16384, col+ts nchar max 49151 def tagColNcharMaxLengthCheckCase(self): + """ + # ? case finish , src bug exist + """ + self.cleanStb() stb_name = self.getLongName(7, "letters") tb_name = f'{stb_name}_1' input_sql = f'{stb_name},id="{tb_name}",t0=t c0=f 1626006833639000000ns' @@ -819,6 +851,7 @@ class TDTestCase: """ test batch insert """ + self.cleanStb() stb_name = self.getLongName(8, "letters") tdSql.execute(f'create stable {stb_name}(ts timestamp, f int) tags(t1 bigint)') lines = ["st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", @@ -839,6 +872,7 @@ class TDTestCase: """ test batch error insert """ + self.cleanStb() stb_name = self.getLongName(8, "letters") lines = ["st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", f"{stb_name},t2=5f64,t3=L\"ste\" c1=tRue,c2=4i64,c3=\"iam\" 1626056811823316532ns"] @@ -861,12 +895,22 @@ class TDTestCase: s_stb_s_tb_list = list() s_stb_s_tb_a_col_a_tag_list = list() s_stb_s_tb_m_col_m_tag_list = list() + s_stb_d_tb_list = list() + s_stb_d_tb_a_col_m_tag_list = list() + s_stb_d_tb_a_tag_m_col_list = list() + s_stb_s_tb_d_ts_list = list() for i in range(count): d_stb_d_tb_list.append(self.genFullTypeSql(t0="f", c0="f")) - s_stb_s_tb_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'{self.getLongName(8, "letters")}"')) - s_stb_s_tb_a_col_a_tag_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'{self.getLongName(8, "letters")}"', ct_add_tag=True)) - s_stb_s_tb_m_col_m_tag_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'{self.getLongName(8, "letters")}"', ct_min_tag=True)) - return d_stb_d_tb_list, s_stb_s_tb_list, s_stb_s_tb_a_col_a_tag_list, s_stb_s_tb_m_col_m_tag_list + s_stb_s_tb_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"')) + s_stb_s_tb_a_col_a_tag_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', ct_add_tag=True)) + s_stb_s_tb_m_col_m_tag_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', ct_min_tag=True)) + s_stb_d_tb_list.append(self.genFullTypeSql(stb_name=stb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', id_noexist_tag=True)) + s_stb_d_tb_a_col_m_tag_list.append(self.genFullTypeSql(stb_name=stb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', id_noexist_tag=True, ct_am_tag=True)) + s_stb_d_tb_a_tag_m_col_list.append(self.genFullTypeSql(stb_name=stb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', id_noexist_tag=True, ct_ma_tag=True)) + s_stb_s_tb_d_ts_list.append(self.genFullTypeSql(stb_name=stb_name, tb_name=tb_name, t7=f'"{self.getLongName(8, "letters")}"', c7=f'"{self.getLongName(8, "letters")}"', ts=0)) + + return d_stb_d_tb_list, s_stb_s_tb_list, s_stb_s_tb_a_col_a_tag_list, s_stb_s_tb_m_col_m_tag_list, s_stb_d_tb_list, s_stb_d_tb_a_col_m_tag_list, s_stb_d_tb_a_tag_m_col_list, s_stb_s_tb_d_ts_list + def genMultiThreadSeq(self, sql_list): tlist = list() @@ -885,6 +929,7 @@ class TDTestCase: """ thread input different stb """ + self.cleanStb() input_sql = self.genSqlList()[0] self.multiThreadRun(self.genMultiThreadSeq(input_sql)) tdSql.query(f"show tables;") @@ -894,6 +939,7 @@ class TDTestCase: """ thread input same stb tb, different data, result keep first data """ + self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql(tb_name=self.getLongName(10, "letters")) self.resCmp(input_sql, stb_name) s_stb_s_tb_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[1] @@ -902,35 +948,110 @@ class TDTestCase: tdSql.checkRows(1) expected_tb_name = self.getNoIdTbName(stb_name)[0] tdSql.checkEqual(tb_name, expected_tb_name) + tdSql.query(f"select * from {stb_name};") + tdSql.checkRows(1) def sStbStbDdataAtcInsertMultiThreadCheckCase(self): - """ - thread input same stb tb, different data, add columes and tags, result keep first data - """ - input_sql, stb_name, tb_name = self.genFullTypeSql(tb_name=self.getLongName(10, "letters")) - self.resCmp(input_sql, stb_name) - s_stb_s_tb_a_col_a_tag_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[2] - print(s_stb_s_tb_a_col_a_tag_list) - self.multiThreadRun(self.genMultiThreadSeq(s_stb_s_tb_a_col_a_tag_list)) - tdSql.query(f"show tables;") - tdSql.checkRows(1) - expected_tb_name = self.getNoIdTbName(stb_name)[0] - tdSql.checkEqual(tb_name, expected_tb_name) - - def sStbStbDdataMtcInsertMultiThreadCheckCase(self): """ thread input same stb tb, different data, add columes and tags, result keep first data """ self.cleanStb() input_sql, stb_name, tb_name = self.genFullTypeSql(tb_name=self.getLongName(10, "letters")) self.resCmp(input_sql, stb_name) + s_stb_s_tb_a_col_a_tag_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[2] + self.multiThreadRun(self.genMultiThreadSeq(s_stb_s_tb_a_col_a_tag_list)) + tdSql.query(f"show tables;") + tdSql.checkRows(1) + expected_tb_name = self.getNoIdTbName(stb_name)[0] + tdSql.checkEqual(tb_name, expected_tb_name) + tdSql.query(f"select * from {stb_name};") + tdSql.checkRows(1) + + def sStbStbDdataMtcInsertMultiThreadCheckCase(self): + """ + thread input same stb tb, different data, minus columes and tags, result keep first data + """ + self.cleanStb() + input_sql, stb_name, tb_name = self.genFullTypeSql(tb_name=self.getLongName(10, "letters")) + self.resCmp(input_sql, stb_name) s_stb_s_tb_m_col_m_tag_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[3] - print(s_stb_s_tb_m_col_m_tag_list) self.multiThreadRun(self.genMultiThreadSeq(s_stb_s_tb_m_col_m_tag_list)) tdSql.query(f"show tables;") tdSql.checkRows(1) expected_tb_name = self.getNoIdTbName(stb_name)[0] tdSql.checkEqual(tb_name, expected_tb_name) + tdSql.query(f"select * from {stb_name};") + tdSql.checkRows(1) + + def sStbDtbDdataInsertMultiThreadCheckCase(self): + """ + thread input same stb, different tb, different data + """ + self.cleanStb() + input_sql, stb_name, tb_name = self.genFullTypeSql() + self.resCmp(input_sql, stb_name) + s_stb_d_tb_list = self.genSqlList(stb_name=stb_name)[4] + self.multiThreadRun(self.genMultiThreadSeq(s_stb_d_tb_list)) + tdSql.query(f"show tables;") + tdSql.checkRows(6) + + def sStbDtbDdataAcMtInsertMultiThreadCheckCase(self): + """ + #! concurrency conflict + """ + """ + thread input same stb, different tb, different data, add col, mul tag + """ + self.cleanStb() + input_sql, stb_name, tb_name = self.genFullTypeSql() + self.resCmp(input_sql, stb_name) + s_stb_d_tb_a_col_m_tag_list = self.genSqlList(stb_name=stb_name)[5] + self.multiThreadRun(self.genMultiThreadSeq(s_stb_d_tb_a_col_m_tag_list)) + tdSql.query(f"show tables;") + tdSql.checkRows(6) + + def sStbDtbDdataAtMcInsertMultiThreadCheckCase(self): + """ + #! concurrency conflict + """ + """ + thread input same stb, different tb, different data, add tag, mul col + """ + self.cleanStb() + input_sql, stb_name, tb_name = self.genFullTypeSql() + self.resCmp(input_sql, stb_name) + s_stb_d_tb_a_tag_m_col_list = self.genSqlList(stb_name=stb_name)[6] + self.multiThreadRun(self.genMultiThreadSeq(s_stb_d_tb_a_tag_m_col_list)) + tdSql.query(f"show tables;") + tdSql.checkRows(6) + + def sStbStbDdataDtsInsertMultiThreadCheckCase(self): + """ + thread input same stb tb, different ts + """ + self.cleanStb() + input_sql, stb_name, tb_name = self.genFullTypeSql(tb_name=self.getLongName(10, "letters")) + self.resCmp(input_sql, stb_name) + s_stb_s_tb_d_ts_list = self.genSqlList(stb_name=stb_name, tb_name=tb_name)[7] + print(s_stb_s_tb_d_ts_list) + self.multiThreadRun(self.genMultiThreadSeq(s_stb_s_tb_d_ts_list)) + tdSql.query(f"show tables;") + tdSql.checkRows(1) + tdSql.query(f"select * from {stb_name}") + tdSql.checkRows(6) + + + + def test(self): + input_sql1 = "rfasta,id=\"rfasta_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"ddzhiksj\",t8=L\"ncharTagValue\" c0=True,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"bnhwlgvj\",c8=L\"ncharTagValue\",c9=7u64 1626006833639000000ns" + + input_sql2 = "rfasta,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"ddzhiksj\",t8=L\"ncharTagValue\" c0=True,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"bnhwlgvj\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000ns" + input_sql3 = 'hmemeb,id="kilrcrldgf",t0=True,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="ndsfdrum",t8=L"ncharTagValue" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="igwoehkm",c8=L"ncharColValue",c9=7u64 0' + input_sql4 = 'hmemeb,id="kilrcrldgf",t0=F,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7="fysodjql",t8=L"ncharTagValue" c0=True,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7="waszbfvc",c8=L"ncharColValue",c9=7u64 0' + + + self._conn.insertLines([input_sql3]) + self._conn.insertLines([input_sql4]) def run(self): print("running {}".format(__file__)) @@ -982,10 +1103,23 @@ class TDTestCase: # ! bug # self.batchErrorInsertCheckCase() - self.stbInsertMultiThreadCheckCase() + # self.stbInsertMultiThreadCheckCase() # self.sStbStbDdataInsertMultiThreadCheckCase() # self.sStbStbDdataAtcInsertMultiThreadCheckCase() - self.sStbStbDdataMtcInsertMultiThreadCheckCase() + # self.sStbStbDdataMtcInsertMultiThreadCheckCase() + # self.sStbDtbDdataInsertMultiThreadCheckCase() + + # ! concurrency conflict + # self.sStbDtbDdataAcMtInsertMultiThreadCheckCase() + # self.sStbDtbDdataAtMcInsertMultiThreadCheckCase() + # ! concurrency conflict + + # self.sStbStbDdataDtsInsertMultiThreadCheckCase() + # self.test() + + + +