From 9017f55820c5576db3764732e3e51de3cc26b92c Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Wed, 8 Jun 2022 18:35:01 +0800 Subject: [PATCH 01/30] add case for tag compute for all --- tests/system-test/2-query/abs.py | 80 ++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/tests/system-test/2-query/abs.py b/tests/system-test/2-query/abs.py index a3e976b490..b1600b4ee1 100644 --- a/tests/system-test/2-query/abs.py +++ b/tests/system-test/2-query/abs.py @@ -65,6 +65,60 @@ class TDTestCase: ''' ) + + def prepare_tag_datas(self): + # prepare datas + tdSql.execute("create database if not exists testdb keep 3650 days 1000") + tdSql.execute(" use testdb ") + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32)) + ''' + ) + + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') + + for i in range(9): + tdSql.execute( + f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute( + f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )") + tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + + tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + + tdSql.execute( + f'''insert into t1 values + ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) + ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) + ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) + ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) + ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) + ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) + ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ''' + ) + def check_result_auto(self ,origin_query , abs_query): abs_result = tdSql.getResult(abs_query) origin_result = tdSql.getResult(origin_query) @@ -95,6 +149,7 @@ class TDTestCase: tdLog.info("abs value check pass , it work as expected ,sql is \"%s\" "%abs_query ) def test_errors(self): + tdSql.execute("use testdb") error_sql_lists = [ "select abs from t1", # "select abs(-+--+c1) from t1", @@ -129,11 +184,16 @@ class TDTestCase: tdSql.error(error_sql) def support_types(self): + tdSql.execute("use testdb") type_error_sql_lists = [ "select abs(ts) from t1" , + "select abs(t0) from t1" , "select abs(c7) from t1", "select abs(c8) from t1", "select abs(c9) from t1", + "select abs(t7) from t1", + "select abs(t8) from t1", + "select abs(t9) from t1", "select abs(ts) from ct1" , "select abs(c7) from ct1", "select abs(c8) from ct1", @@ -171,6 +231,13 @@ class TDTestCase: "select abs(c5) from t1", "select abs(c6) from t1", + "select abs(t1) from ct1", + "select abs(t2) from ct1", + "select abs(t3) from ct1", + "select abs(t4) from ct1", + "select abs(t5) from ct1", + "select abs(t6) from ct1", + "select abs(c1) from ct1", "select abs(c2) from ct1", "select abs(c3) from ct1", @@ -447,6 +514,14 @@ class TDTestCase: self.check_result_auto("select c1+1 ,c2 , c3*1 , c4/2, c5/2, c6 from sub1_bound" ,"select abs(c1+1) ,abs(c2) , abs(c3*1) , abs(c4/2), abs(c5)/2, abs(c6) from sub1_bound ") + def test_tag_compute_for_scalar_function(self): + + tdSql.execute("use testdb") + + self.check_result_auto( "select c1, t2, t3 , t4, t5 from ct4 ", "select (c1), abs(t2) ,abs(t3), abs(t4), abs(t5) from ct4") + self.check_result_auto( "select c1+2, t2+2, t3 , t4, t5 from ct4 ", "select (c1)+2, abs(t2)+2 ,abs(t3), abs(t4), abs(t5) from ct4") + self.check_result_auto( "select c1+2, t2+2, t3 , t4, t5 from stb1 order by t1 ", "select (c1)+2, abs(t2)+2 ,abs(t3), abs(t4), abs(t5) from stb1 order by t1") + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring tdSql.prepare() @@ -454,6 +529,7 @@ class TDTestCase: tdLog.printNoPrefix("==========step1:create table ==============") self.prepare_datas() + self.prepare_tag_datas() tdLog.printNoPrefix("==========step2:test errors ==============") @@ -475,6 +551,10 @@ class TDTestCase: self.abs_func_filter() + tdLog.printNoPrefix("==========step6: tag coumpute query ============") + + self.test_tag_compute_for_scalar_function() + def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") From 6cff07189bebe3636e5eb05ec7a6bad1ffe80412 Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Thu, 9 Jun 2022 09:57:46 +0800 Subject: [PATCH 02/30] update to_iso8601.py --- tests/system-test/2-query/To_iso8601.py | 159 +++++++++++------------- 1 file changed, 74 insertions(+), 85 deletions(-) diff --git a/tests/system-test/2-query/To_iso8601.py b/tests/system-test/2-query/To_iso8601.py index 57bcca638c..32c9c216a0 100644 --- a/tests/system-test/2-query/To_iso8601.py +++ b/tests/system-test/2-query/To_iso8601.py @@ -3,7 +3,7 @@ from time import sleep from util.log import * from util.sql import * from util.cases import * - +import os @@ -12,28 +12,54 @@ class TDTestCase: def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) - - def run(self): # sourcery skip: extract-duplicate-method - tdSql.prepare() - # get system timezone - today_date = datetime.datetime.strptime( - datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") + self.rowNum = 10 + self.ts = 1640966400000 # 2022-1-1 00:00:00.000 + def check_customize_param_ms(self): + # today_date = datetime.datetime.strptime( + # datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") + # print(today_date) + time_zone = (os.popen('timedatectl | grep zone').read().strip().split(',')[1].lstrip())[0:5] + print(time_zone) + tdSql.execute('create database db1 precision "ms"') + tdSql.execute('use db1') + tdSql.execute( + 'create table if not exists ntb(ts timestamp, c1 int, c2 timestamp)' + ) + for i in range(self.rowNum): + tdSql.execute("insert into ntb values(%d, %d, %d)" + % (self.ts + i, i + 1, self.ts + i)) + tdSql.query('select to_iso8601(ts) from ntb') + print(tdSql.queryResult) + for i in range(self.rowNum): + tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{time_zone}') + for j in ['z','Z']: + tdSql.query('select to_iso8601(ts,"{j}") from ntb') + print(tdSql.queryResult) + for i in range(self.rowNum): + tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{time_zone}') + + timezone_list = ['+0000','+0100','+0200','+0300','+0330','+0400','+0500','+0530','+0600','+0700','+0800','+0900','+1000','+1100','+1200',\ + '+00','+01','+02','+03','+04','+05','+06','+07','+08','+09','+10','+11','+12',\ + '+00:00','+01:00','+02:00','+03:00','+03:30','+04:00','+05:00','+05:30','+06:00','+07:00','+08:00','+09:00','+10:00','+11:00','+12:00',\ + '-0000','-0100','-0200','-0300','-0400','-0500','-0600','-0700','-0800','-0900','-1000','-1100','-1200',\ + '-00','-01','-02','-03','-04','-05','-06','-07','-08','-09','-10','-11','-12',\ + '-00:00','-01:00','-02:00','-03:00','-04:00','-05:00','-06:00','-07:00','-08:00','-09:00','-10:00','-11:00','-12:00'] + for j in timezone_list: + tdSql.query(f'select to_iso8601(ts,"{j}") from ntb') + for i in range(self.rowNum): + tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{time_zone}') + + error_param_list = [0,100,5,'a','+13','+0101','+00:01','-0001','-13','-00:01'] + for i in error_param_list: + tdSql.error(f'select to_iso8601(ts,"{error_param_list}") from ntb') + + def check_base_function(self): + tdSql.prepare() tdLog.printNoPrefix("==========step1:create tables==========") - tdSql.execute( - '''create table if not exists ntb - (ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) - ''' - ) - tdSql.execute( - '''create table if not exists stb - (ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) tags(t0 int) - ''' - ) - tdSql.execute( - '''create table if not exists stb_1 using stb tags(100) - ''' - ) + tdSql.execute('create table if not exists ntb(ts timestamp, c1 int, c2 float,c3 double,c4 timestamp)') + tdSql.execute('create table if not exists stb(ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) tags(t0 int)') + tdSql.execute('create table if not exists stb_1 using stb tags(100)') tdLog.printNoPrefix("==========step2:insert data==========") tdSql.execute('insert into ntb values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())') @@ -67,53 +93,33 @@ class TDTestCase: tdSql.error("select to_iso8601(timezone()) from ntb") tdSql.error("select to_iso8601('abc') from ntb") - tdSql.query("select to_iso8601(today()) *null from ntb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) +null from ntb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) -null from ntb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) /null from ntb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - - tdSql.query("select to_iso8601(today()) *null from db.ntb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) +null from db.ntb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) -null from db.ntb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) /null from db.ntb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - # tdSql.query("select to_iso8601(-1) from ntb") + for i in ['+','-','*','/']: + tdSql.query(f"select to_iso8601(today()) {i}null from ntb") + tdSql.checkRows(3) + tdSql.checkData(0,0,None) + tdSql.query(f"select to_iso8601(today()) {i}null from db.ntb") + tdSql.checkRows(3) + tdSql.checkData(0,0,None) tdSql.query("select to_iso8601(9223372036854775807) from ntb") tdSql.checkRows(3) - # bug TD-14896 + # bug TD-15207 # tdSql.query("select to_iso8601(10000000000) from ntb") # tdSql.checkData(0,0,None) # tdSql.query("select to_iso8601(-1) from ntb") # tdSql.checkRows(3) # tdSql.query("select to_iso8601(-10000000000) from ntb") # tdSql.checkData(0,0,None) - tdSql.error("select to_iso8601(1.5) from ntb") - tdSql.error("select to_iso8601(1.5) from db.ntb") - tdSql.error("select to_iso8601('a') from ntb") - tdSql.error("select to_iso8601(c2) from ntb") + err_param = [1.5,'a','c2'] + for i in err_param: + tdSql.error(f"select to_iso8601({i}) from ntb") + tdSql.error(f"select to_iso8601({i}) from db.ntb") tdSql.query("select to_iso8601(now) from stb") + tdSql.checkRows(3) tdSql.query("select to_iso8601(now()) from stb") tdSql.checkRows(3) + tdSql.query("select to_iso8601(1) from stb") for i in range(1,10): - tdSql.query("select to_iso8601(1) from stb") - tdSql.checkData(0,0,"1970-01-01T08:00:01+0800") - i+=1 - sleep(0.2) + tdSql.checkData(i,0,"1970-01-01T08:00:01+0800") tdSql.checkRows(3) tdSql.query("select to_iso8601(ts) from stb") tdSql.checkRows(3) @@ -121,38 +127,21 @@ class TDTestCase: tdSql.checkRows(3) tdSql.query("select to_iso8601(ts)+'a' from stb ") tdSql.checkRows(3) - - tdSql.query("select to_iso8601(today()) *null from stb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) +null from stb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) -null from stb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) /null from stb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) *null from db.stb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) +null from db.stb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) -null from db.stb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) /null from db.stb") - tdSql.checkRows(3) - tdSql.checkData(0,0,None) + for i in ['+','-','*','/']: + tdSql.query("select to_iso8601(today()) {i}null from stb") + tdSql.checkRows(3) + tdSql.checkData(0,0,None) + tdSql.query("select to_iso8601(today()) {i}null from db.stb") + tdSql.checkRows(3) + tdSql.checkData(0,0,None) - # bug TD-14896 + # bug TD-15207 # tdSql.query("select to_iso8601(-1) from ntb") # tdSql.checkRows(3) - - - + + def run(self): # sourcery skip: extract-duplicate-method + self.check_base_function() + self.check_customize_param_ms() def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") From b0884ca9cfa1cbf2b1be8e9d52d2e6b6aece969d Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Thu, 9 Jun 2022 15:32:42 +0800 Subject: [PATCH 03/30] update case to support tag compute --- tests/system-test/2-query/abs.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/system-test/2-query/abs.py b/tests/system-test/2-query/abs.py index b1600b4ee1..30af755f05 100644 --- a/tests/system-test/2-query/abs.py +++ b/tests/system-test/2-query/abs.py @@ -522,6 +522,29 @@ class TDTestCase: self.check_result_auto( "select c1+2, t2+2, t3 , t4, t5 from ct4 ", "select (c1)+2, abs(t2)+2 ,abs(t3), abs(t4), abs(t5) from ct4") self.check_result_auto( "select c1+2, t2+2, t3 , t4, t5 from stb1 order by t1 ", "select (c1)+2, abs(t2)+2 ,abs(t3), abs(t4), abs(t5) from stb1 order by t1") + # bug need fix + + # tdSql.query(" select sum(c1) from stb1 where t1+10 >1; ") # taosd crash + tdSql.query("select c1 ,t1 from stb1 where t1 =0 ") + tdSql.checkRows(13) + # tdSql.query("select t1 from stb1 where t1 >0 ") + # tdSql.checkRows(3) + # tdSql.query("select sum(t1) from (select c1 ,t1 from stb1)") + # tdSql.checkData(0,0,61) + # tdSql.query("select distinct(c1) ,t1 from stb1") + # tdSql.checkRows(11) + # tdSql.query("select max(t2) , t1 ,c1, t2 from stb1") + # tdSql.checkData(0,3,33333) + + # tag filter with abs function + tdSql.error("select t1 from stb1 where abs(t1)=1") + tdSql.query("select t1 from stb1 where abs(c1+t1)=1") + tdSql.checkRows(2) + # tdSql.query("select t1 from stb1 where abs(t1+c1)=1") + # tdSql.checkRows(2) + tdSql.query("select abs(c1+t1)*t1 from stb1 where abs(c1)/floor(abs(ceil(t1))) ==1") + + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring tdSql.prepare() From 89b588e6d9fb4de5087c30b4f0f23d8648a2d8c0 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Thu, 9 Jun 2022 16:14:37 +0800 Subject: [PATCH 04/30] update case --- tests/system-test/2-query/abs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/system-test/2-query/abs.py b/tests/system-test/2-query/abs.py index 30af755f05..f67aa4c605 100644 --- a/tests/system-test/2-query/abs.py +++ b/tests/system-test/2-query/abs.py @@ -537,7 +537,8 @@ class TDTestCase: # tdSql.checkData(0,3,33333) # tag filter with abs function - tdSql.error("select t1 from stb1 where abs(t1)=1") + tdSql.query("select t1 from stb1 where abs(t1)=1") + tdSql.checkRows(4) tdSql.query("select t1 from stb1 where abs(c1+t1)=1") tdSql.checkRows(2) # tdSql.query("select t1 from stb1 where abs(t1+c1)=1") From cda7463a2de07d3361335ed95f49a7135e34ded2 Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Thu, 9 Jun 2022 17:14:13 +0800 Subject: [PATCH 05/30] update test cases --- tests/system-test/1-insert/alter_stable.py | 6 +- tests/system-test/1-insert/alter_table.py | 100 +++++++++++++++++++++ tests/system-test/2-query/To_iso8601.py | 45 ++++------ 3 files changed, 121 insertions(+), 30 deletions(-) diff --git a/tests/system-test/1-insert/alter_stable.py b/tests/system-test/1-insert/alter_stable.py index e8c3544152..c92efb403c 100644 --- a/tests/system-test/1-insert/alter_stable.py +++ b/tests/system-test/1-insert/alter_stable.py @@ -122,10 +122,10 @@ class TDTestCase: for i in ['bigint','unsigned int','float','double','binary(10)','nchar(10)']: for j in [1,2,3]: - tdSql.error(f'alter table {stbname} modify tag t{j} {i}') + tdSql.error(f'alter stable {stbname} modify tag t{j} {i}') for i in ['int','unsigned int','float','binary(10)','nchar(10)']: - tdSql.error(f'alter table {stbname} modify tag t8 {i}') - tdSql.error(f'alter table {stbname} modify tag t4 int') + tdSql.error(f'alter stable {stbname} modify tag t8 {i}') + tdSql.error(f'alter stable {stbname} modify tag t4 int') tdSql.execute(f'drop database {dbname}') def run(self): diff --git a/tests/system-test/1-insert/alter_table.py b/tests/system-test/1-insert/alter_table.py index 53e3793dcf..99e673d761 100644 --- a/tests/system-test/1-insert/alter_table.py +++ b/tests/system-test/1-insert/alter_table.py @@ -158,9 +158,109 @@ class TDTestCase: tdSql.error(f'alter table {dbname}.{tbname} modify column c1 bool') tdSql.error(f'alter table {dbname}.{tbname} modify column c1 binary(10)') tdSql.execute(f'drop database {dbname}') + def alter_stb_column_check(self): + dbname = self.get_long_name(length=10, mode="letters") + tdSql.execute(f'create database if not exists {dbname}') + stbname = self.get_long_name(length=3, mode="letters") + tbname = self.get_long_name(length=3, mode="letters") + tdSql.execute(f'create database if not exists {dbname}') + tdSql.execute(f'use {dbname}') + tdSql.execute( + f'create table {stbname} (ts timestamp, c1 tinyint, c2 smallint, c3 int, \ + c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 bool,c12 binary(20),c13 nchar(20)) tags(t0 int) ') + tdSql.execute(f'create table {tbname} using {stbname} tags(1)') + tdSql.execute(f'insert into {tbname} values (now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据")') + tdSql.execute(f'alter table {stbname} add column c14 int') + tdSql.query(f'select c14 from {stbname}') + tdSql.checkRows(1) + tdSql.execute(f'alter table {stbname} add column `c15` int') + tdSql.query(f'select c15 from {stbname}') + tdSql.checkRows(1) + tdSql.query(f'describe {stbname}') + tdSql.checkRows(17) + tdSql.execute(f'alter table {stbname} drop column c14') + tdSql.query(f'describe {stbname}') + tdSql.checkRows(16) + tdSql.execute(f'alter table {stbname} drop column `c15`') + tdSql.query(f'describe {stbname}') + tdSql.checkRows(15) + tdSql.execute(f'alter table {stbname} modify column c12 binary(30)') + tdSql.query(f'describe {stbname}') + tdSql.checkData(12,2,30) + tdSql.execute(f'alter table {stbname} modify column `c12` binary(35)') + tdSql.query(f'describe {stbname}') + tdSql.checkData(12,2,35) + tdSql.error(f'alter table {stbname} modify column `c12` binary(34)') + tdSql.execute(f'alter table {stbname} modify column c13 nchar(30)') + tdSql.query(f'describe {stbname}') + tdSql.checkData(13,2,30) + tdSql.error(f'alter table {stbname} modify column c13 nchar(29)') + tdSql.error(f'alter table {stbname} rename column c1 c21') + tdSql.error(f'alter table {stbname} modify column c1 int') + tdSql.error(f'alter table {stbname} modify column c4 int') + tdSql.error(f'alter table {stbname} modify column c8 int') + tdSql.error(f'alter table {stbname} modify column c1 unsigned int') + tdSql.error(f'alter table {stbname} modify column c9 double') + tdSql.error(f'alter table {stbname} modify column c10 float') + tdSql.error(f'alter table {stbname} modify column c11 int') + tdSql.execute(f'drop database {dbname}') + def alter_stb_tag_check(self): + dbname = self.get_long_name(length=10, mode="letters") + tdSql.execute(f'create database if not exists {dbname}') + stbname = self.get_long_name(length=3, mode="letters") + tbname = self.get_long_name(length=3, mode="letters") + tdSql.execute(f'create database if not exists {dbname}') + tdSql.execute(f'use {dbname}') + tdSql.execute( + f'create table {stbname} (ts timestamp, c1 int) tags(ts_tag timestamp, t1 tinyint, t2 smallint, t3 int, \ + t4 bigint, t5 tinyint unsigned, t6 smallint unsigned, t7 int unsigned, t8 bigint unsigned, t9 float, t10 double, t11 bool,t12 binary(20),t13 nchar(20)) ') + tdSql.execute(f'create table {tbname} using {stbname} tags(now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据")') + tdSql.execute(f'insert into {tbname} values(now,1)') + + tdSql.execute(f'alter table {stbname} add tag t14 int') + tdSql.query(f'select t14 from {stbname}') + tdSql.checkRows(1) + tdSql.execute(f'alter table {stbname} add tag `t15` int') + tdSql.query(f'select t14 from {stbname}') + tdSql.checkRows(1) + tdSql.query(f'describe {stbname}') + tdSql.checkRows(18) + tdSql.execute(f'alter table {stbname} drop tag t14') + tdSql.query(f'describe {stbname}') + tdSql.checkRows(17) + tdSql.execute(f'alter table {stbname} drop tag `t15`') + tdSql.query(f'describe {stbname}') + tdSql.checkRows(16) + tdSql.execute(f'alter table {stbname} modify tag t12 binary(30)') + tdSql.query(f'describe {stbname}') + tdSql.checkData(14,2,30) + tdSql.execute(f'alter table {stbname} modify tag `t12` binary(35)') + tdSql.query(f'describe {stbname}') + tdSql.checkData(14,2,35) + tdSql.error(f'alter table {stbname} modify tag `t12` binary(34)') + tdSql.execute(f'alter table {stbname} modify tag t13 nchar(30)') + tdSql.query(f'describe {stbname}') + tdSql.checkData(15,2,30) + tdSql.error(f'alter table {stbname} modify tag t13 nchar(29)') + tdSql.execute(f'alter table {stbname} rename tag t1 t21') + tdSql.query(f'describe {stbname}') + tdSql.checkData(3,0,'t21') + tdSql.execute(f'alter table {stbname} rename tag `t21` t1') + tdSql.query(f'describe {stbname}') + tdSql.checkData(3,0,'t1') + + for i in ['bigint','unsigned int','float','double','binary(10)','nchar(10)']: + for j in [1,2,3]: + tdSql.error(f'alter table {stbname} modify tag t{j} {i}') + for i in ['int','unsigned int','float','binary(10)','nchar(10)']: + tdSql.error(f'alter table {stbname} modify tag t8 {i}') + tdSql.error(f'alter table {stbname} modify tag t4 int') + tdSql.execute(f'drop database {dbname}') def run(self): self.alter_tb_tag_check() self.alter_ntb_column_check() + self.alter_stb_column_check() + self.alter_stb_tag_check() def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/To_iso8601.py b/tests/system-test/2-query/To_iso8601.py index 32c9c216a0..f00d25bd21 100644 --- a/tests/system-test/2-query/To_iso8601.py +++ b/tests/system-test/2-query/To_iso8601.py @@ -20,39 +20,35 @@ class TDTestCase: # print(today_date) time_zone = (os.popen('timedatectl | grep zone').read().strip().split(',')[1].lstrip())[0:5] - print(time_zone) tdSql.execute('create database db1 precision "ms"') tdSql.execute('use db1') - tdSql.execute( - 'create table if not exists ntb(ts timestamp, c1 int, c2 timestamp)' - ) + tdSql.execute('create table if not exists ntb(ts timestamp, c1 int, c2 timestamp)') for i in range(self.rowNum): tdSql.execute("insert into ntb values(%d, %d, %d)" % (self.ts + i, i + 1, self.ts + i)) tdSql.query('select to_iso8601(ts) from ntb') - print(tdSql.queryResult) for i in range(self.rowNum): tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{time_zone}') - for j in ['z','Z']: - tdSql.query('select to_iso8601(ts,"{j}") from ntb') - print(tdSql.queryResult) - for i in range(self.rowNum): - tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{time_zone}') timezone_list = ['+0000','+0100','+0200','+0300','+0330','+0400','+0500','+0530','+0600','+0700','+0800','+0900','+1000','+1100','+1200',\ '+00','+01','+02','+03','+04','+05','+06','+07','+08','+09','+10','+11','+12',\ '+00:00','+01:00','+02:00','+03:00','+03:30','+04:00','+05:00','+05:30','+06:00','+07:00','+08:00','+09:00','+10:00','+11:00','+12:00',\ '-0000','-0100','-0200','-0300','-0400','-0500','-0600','-0700','-0800','-0900','-1000','-1100','-1200',\ '-00','-01','-02','-03','-04','-05','-06','-07','-08','-09','-10','-11','-12',\ - '-00:00','-01:00','-02:00','-03:00','-04:00','-05:00','-06:00','-07:00','-08:00','-09:00','-10:00','-11:00','-12:00'] + '-00:00','-01:00','-02:00','-03:00','-04:00','-05:00','-06:00','-07:00','-08:00','-09:00','-10:00','-11:00','-12:00'\ + 'z','Z'] for j in timezone_list: tdSql.query(f'select to_iso8601(ts,"{j}") from ntb') for i in range(self.rowNum): - tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{time_zone}') - - error_param_list = [0,100,5,'a','+13','+0101','+00:01','-0001','-13','-00:01'] + tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{j}') + + error_param_list = [0,100.5,'a','!'] for i in error_param_list: - tdSql.error(f'select to_iso8601(ts,"{error_param_list}") from ntb') + tdSql.error(f'select to_iso8601(ts,"{i}") from ntb') + # bug TD-16372:对于错误的时区,缺少校验 + # error_timezone_param = ['+13','-13','+1300','-1300','+0001','-0001','-0330','+0330'] + # for i in error_timezone_param: + # tdSql.error(f'select to_iso8601(ts,"{i}") from ntb') def check_base_function(self): tdSql.prepare() @@ -74,12 +70,9 @@ class TDTestCase: tdSql.checkRows(1) tdSql.query("select to_iso8601(ts) from ntb where ts=today()") tdSql.checkRows(1) - # tdSql.checkData(0,0,10) - for i in range(1,10): + for i in range(0,3): tdSql.query("select to_iso8601(1) from ntb") - tdSql.checkData(0,0,"1970-01-01T08:00:01+0800") - i+=1 - sleep(0.2) + tdSql.checkData(i,0,"1970-01-01T08:00:01+0800") tdSql.checkRows(3) tdSql.query("select to_iso8601(ts) from ntb") tdSql.checkRows(3) @@ -113,12 +106,13 @@ class TDTestCase: for i in err_param: tdSql.error(f"select to_iso8601({i}) from ntb") tdSql.error(f"select to_iso8601({i}) from db.ntb") + tdSql.query("select to_iso8601(now) from stb") tdSql.checkRows(3) tdSql.query("select to_iso8601(now()) from stb") tdSql.checkRows(3) tdSql.query("select to_iso8601(1) from stb") - for i in range(1,10): + for i in range(0,3): tdSql.checkData(i,0,"1970-01-01T08:00:01+0800") tdSql.checkRows(3) tdSql.query("select to_iso8601(ts) from stb") @@ -128,20 +122,17 @@ class TDTestCase: tdSql.query("select to_iso8601(ts)+'a' from stb ") tdSql.checkRows(3) for i in ['+','-','*','/']: - tdSql.query("select to_iso8601(today()) {i}null from stb") + tdSql.query(f"select to_iso8601(today()) {i}null from stb") tdSql.checkRows(3) tdSql.checkData(0,0,None) - tdSql.query("select to_iso8601(today()) {i}null from db.stb") + tdSql.query(f"select to_iso8601(today()) {i}null from db.stb") tdSql.checkRows(3) tdSql.checkData(0,0,None) - - # bug TD-15207 - # tdSql.query("select to_iso8601(-1) from ntb") - # tdSql.checkRows(3) def run(self): # sourcery skip: extract-duplicate-method self.check_base_function() self.check_customize_param_ms() + def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") From 5b840c4be19b0b534249b7c0173818d1dd25cebf Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Thu, 9 Jun 2022 17:15:18 +0800 Subject: [PATCH 06/30] update --- tests/system-test/2-query/To_iso8601.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/To_iso8601.py b/tests/system-test/2-query/To_iso8601.py index f00d25bd21..b9cf1ce9a7 100644 --- a/tests/system-test/2-query/To_iso8601.py +++ b/tests/system-test/2-query/To_iso8601.py @@ -35,7 +35,7 @@ class TDTestCase: '+00:00','+01:00','+02:00','+03:00','+03:30','+04:00','+05:00','+05:30','+06:00','+07:00','+08:00','+09:00','+10:00','+11:00','+12:00',\ '-0000','-0100','-0200','-0300','-0400','-0500','-0600','-0700','-0800','-0900','-1000','-1100','-1200',\ '-00','-01','-02','-03','-04','-05','-06','-07','-08','-09','-10','-11','-12',\ - '-00:00','-01:00','-02:00','-03:00','-04:00','-05:00','-06:00','-07:00','-08:00','-09:00','-10:00','-11:00','-12:00'\ + '-00:00','-01:00','-02:00','-03:00','-04:00','-05:00','-06:00','-07:00','-08:00','-09:00','-10:00','-11:00','-12:00',\ 'z','Z'] for j in timezone_list: tdSql.query(f'select to_iso8601(ts,"{j}") from ntb') From b442c771fcc82aa43ed1f77179de1ae869d6b12f Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Fri, 10 Jun 2022 09:18:03 +0800 Subject: [PATCH 07/30] update test case --- tests/system-test/2-query/To_iso8601.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/To_iso8601.py b/tests/system-test/2-query/To_iso8601.py index b9cf1ce9a7..46d368aac4 100644 --- a/tests/system-test/2-query/To_iso8601.py +++ b/tests/system-test/2-query/To_iso8601.py @@ -19,7 +19,7 @@ class TDTestCase: # datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") # print(today_date) - time_zone = (os.popen('timedatectl | grep zone').read().strip().split(',')[1].lstrip())[0:5] + time_zone = os.popen('date "+%z"').read().strip() tdSql.execute('create database db1 precision "ms"') tdSql.execute('use db1') tdSql.execute('create table if not exists ntb(ts timestamp, c1 int, c2 timestamp)') From 58ab0549f6ddf271efca463d57cf068ea9116f14 Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Fri, 10 Jun 2022 11:03:17 +0430 Subject: [PATCH 08/30] update test case --- tests/system-test/2-query/To_iso8601.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/system-test/2-query/To_iso8601.py b/tests/system-test/2-query/To_iso8601.py index 46d368aac4..4051f46866 100644 --- a/tests/system-test/2-query/To_iso8601.py +++ b/tests/system-test/2-query/To_iso8601.py @@ -46,9 +46,9 @@ class TDTestCase: for i in error_param_list: tdSql.error(f'select to_iso8601(ts,"{i}") from ntb') # bug TD-16372:对于错误的时区,缺少校验 - # error_timezone_param = ['+13','-13','+1300','-1300','+0001','-0001','-0330','+0330'] - # for i in error_timezone_param: - # tdSql.error(f'select to_iso8601(ts,"{i}") from ntb') + error_timezone_param = ['+13','-13','+1300','-1300','+0001','-0001','-0330','+0330'] + for i in error_timezone_param: + tdSql.error(f'select to_iso8601(ts,"{i}") from ntb') def check_base_function(self): tdSql.prepare() From 334a53dab341260acc91c0a6d49a079608f2ee81 Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Fri, 10 Jun 2022 11:04:55 +0430 Subject: [PATCH 09/30] update test case --- tests/system-test/2-query/To_iso8601.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/system-test/2-query/To_iso8601.py b/tests/system-test/2-query/To_iso8601.py index 4051f46866..b90c1222a5 100644 --- a/tests/system-test/2-query/To_iso8601.py +++ b/tests/system-test/2-query/To_iso8601.py @@ -15,9 +15,6 @@ class TDTestCase: self.rowNum = 10 self.ts = 1640966400000 # 2022-1-1 00:00:00.000 def check_customize_param_ms(self): - # today_date = datetime.datetime.strptime( - # datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") - # print(today_date) time_zone = os.popen('date "+%z"').read().strip() tdSql.execute('create database db1 precision "ms"') @@ -45,7 +42,7 @@ class TDTestCase: error_param_list = [0,100.5,'a','!'] for i in error_param_list: tdSql.error(f'select to_iso8601(ts,"{i}") from ntb') - # bug TD-16372:对于错误的时区,缺少校验 + #! bug TD-16372:对于错误的时区,缺少校验 error_timezone_param = ['+13','-13','+1300','-1300','+0001','-0001','-0330','+0330'] for i in error_timezone_param: tdSql.error(f'select to_iso8601(ts,"{i}") from ntb') From 8b5269795b959c1ac71d5ae0f3754184aec7af8a Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Fri, 10 Jun 2022 16:16:57 +0800 Subject: [PATCH 10/30] update case --- tests/system-test/2-query/abs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/system-test/2-query/abs.py b/tests/system-test/2-query/abs.py index f67aa4c605..a843feb827 100644 --- a/tests/system-test/2-query/abs.py +++ b/tests/system-test/2-query/abs.py @@ -538,11 +538,11 @@ class TDTestCase: # tag filter with abs function tdSql.query("select t1 from stb1 where abs(t1)=1") - tdSql.checkRows(4) + tdSql.checkRows(1) tdSql.query("select t1 from stb1 where abs(c1+t1)=1") - tdSql.checkRows(2) + tdSql.checkRows(1) # tdSql.query("select t1 from stb1 where abs(t1+c1)=1") - # tdSql.checkRows(2) + # tdSql.checkRows(1) tdSql.query("select abs(c1+t1)*t1 from stb1 where abs(c1)/floor(abs(ceil(t1))) ==1") From 63269c19de77c016adbf5d36f71b8e6d3724f0bb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Jun 2022 16:45:26 +0800 Subject: [PATCH 11/30] fix(query): fix invalid write. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 5 +++++ source/libs/executor/src/executorimpl.c | 25 ++++++------------------- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4a433a557b..d2c60047d7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3163,6 +3163,11 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) { size_t numOfCols = taosArrayGetSize(pTsdbReadHandle->pColumns); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); + int32_t code = colInfoDataEnsureCapacity(pColInfo, 0, pTsdbReadHandle->outputCapacity); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + ASSERT(0); + } colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 13f1b976dc..9a002d2d2f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1819,7 +1819,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO } } -static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, bool needFree); +static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree) { if (pFilterNode == NULL) { @@ -1840,11 +1840,11 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree) { bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); filterFreeInfo(filter); - extractQualifiedTupleByFilterResult(pBlock, rowRes, keep, needFree); + extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); blockDataUpdateTsWindow(pBlock, 0); } -void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, bool needFree) { +void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) { if (keep) { return; } @@ -1884,17 +1884,9 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR ASSERT(pBlock->info.rows == numOfRows); } - SColumnInfoData tmp = *pSrc; - *pSrc = *pDst; - *pDst = tmp; - - if (!needFree) { - if (IS_VAR_DATA_TYPE(pDst->info.type)) { // this elements do not need free - pDst->varmeta.offset = NULL; - } else { - pDst->nullbitmap = NULL; - } - pDst->pData = NULL; + // write back + if (pBlock->info.rows > 0) { + colDataAssign(pSrc, pDst, pBlock->info.rows); } } blockDataDestroy(px); // fix memory leak @@ -2086,11 +2078,6 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, } int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) { - // for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { - // SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); - // p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows); - // } - int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows); pBlock->info.rows += numOfRows; From 516750bd42c4b374fbdd4b8f35905adf784560d4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Jun 2022 23:58:17 +0800 Subject: [PATCH 12/30] refactor: remove unnecessary check. --- source/libs/executor/src/executorimpl.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9a002d2d2f..368f6c3aba 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1885,9 +1885,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR } // write back - if (pBlock->info.rows > 0) { - colDataAssign(pSrc, pDst, pBlock->info.rows); - } + colDataAssign(pSrc, pDst, pBlock->info.rows); } blockDataDestroy(px); // fix memory leak } else { From afe612e369a2f6fd9544a1d34b23d4d10ab8322c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Jun 2022 00:51:59 +0800 Subject: [PATCH 13/30] refactor: remove unnecessary check. --- source/common/src/tdatablock.c | 3 ++- source/dnode/vnode/src/tsdb/tsdbRead.c | 5 ----- source/libs/executor/src/executorimpl.c | 10 ++++------ 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 66b12678ab..38be0b8f0a 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -294,7 +294,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, in int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows) { ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type); - if (numOfRows == 0) { + if (numOfRows <= 0) { return numOfRows; } @@ -322,6 +322,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p pColumnInfoData->varmeta.length = pSource->varmeta.length; } else { char* tmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(numOfRows)); + printf("----------------%d\n", BitmapLen(numOfRows)); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d2c60047d7..4a433a557b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3163,11 +3163,6 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) { size_t numOfCols = taosArrayGetSize(pTsdbReadHandle->pColumns); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); - int32_t code = colInfoDataEnsureCapacity(pColInfo, 0, pTsdbReadHandle->outputCapacity); - if (code != TSDB_CODE_SUCCESS) { - // todo handle error - ASSERT(0); - } colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 368f6c3aba..69a61b413d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1850,14 +1850,12 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR } if (rowRes != NULL) { - SSDataBlock* px = createOneDataBlock(pBlock, false); - blockDataEnsureCapacity(px, pBlock->info.rows); + SSDataBlock* px = createOneDataBlock(pBlock, true); int32_t totalRows = pBlock->info.rows; - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i); - SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i); + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); // it is a reserved column for scalar function, and no data in this column yet. if (pSrc->pData == NULL) { @@ -1885,7 +1883,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR } // write back - colDataAssign(pSrc, pDst, pBlock->info.rows); +// colDataAssign(pSrc, pDst, pBlock->info.rows); } blockDataDestroy(px); // fix memory leak } else { From ed71b7afe7179af08d51a99db9b3ec5235bb7b46 Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Sat, 11 Jun 2022 10:17:58 +0800 Subject: [PATCH 14/30] update --- tests/system-test/1-insert/alter_table.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/system-test/1-insert/alter_table.py b/tests/system-test/1-insert/alter_table.py index 99e673d761..3c0def86e4 100644 --- a/tests/system-test/1-insert/alter_table.py +++ b/tests/system-test/1-insert/alter_table.py @@ -145,10 +145,17 @@ class TDTestCase: tdSql.execute(f'alter table {dbname}.{tbname} rename column c1 c21') tdSql.query(f'describe {dbname}.{tbname}') tdSql.checkData(1,0,'c21') + # !bug TD-16423 + # tdSql.error(f'select c1 from {dbname}.{tbname}') + # tdSql.query(f'select c21 from {dbname}.{tbname}') + # tdSql.checkData(0,1,1) tdSql.execute(f'alter table {dbname}.{tbname} rename column `c21` c1') tdSql.query(f'describe {dbname}.{tbname}') tdSql.checkData(1,0,'c1') - + # !bug TD-16423 + # tdSql.error(f'select c1 from {dbname}.{tbname}') + # tdSql.query(f'select c1 from {dbname}.{tbname}') + # tdSql.checkData(0,1,1) tdSql.error(f'alter table {dbname}.{tbname} modify column c1 bigint') tdSql.error(f'alter table {dbname}.{tbname} modify column c1 double') tdSql.error(f'alter table {dbname}.{tbname} modify column c4 int') From 41eddcecb6c6d6a13e0116ba9b28e102369fb70a Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Sat, 11 Jun 2022 10:27:14 +0800 Subject: [PATCH 15/30] update --- tests/system-test/2-query/To_iso8601.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/To_iso8601.py b/tests/system-test/2-query/To_iso8601.py index b90c1222a5..973e1e49eb 100644 --- a/tests/system-test/2-query/To_iso8601.py +++ b/tests/system-test/2-query/To_iso8601.py @@ -43,7 +43,7 @@ class TDTestCase: for i in error_param_list: tdSql.error(f'select to_iso8601(ts,"{i}") from ntb') #! bug TD-16372:对于错误的时区,缺少校验 - error_timezone_param = ['+13','-13','+1300','-1300','+0001','-0001','-0330','+0330'] + error_timezone_param = ['+13','-13','+1300','-1300','+0001','-0001','-0330','-0530'] for i in error_timezone_param: tdSql.error(f'select to_iso8601(ts,"{i}") from ntb') From 95ca88432f92e586956ef452ac65e02c38d8a0e5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 11 Jun 2022 10:58:57 +0800 Subject: [PATCH 16/30] fix: save sync config in mnode --- source/dnode/mnode/sdb/src/sdb.c | 5 ++++- source/dnode/mnode/sdb/src/sdbFile.c | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index c11aaaaa8a..060bba0ce4 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -163,7 +163,10 @@ void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; } void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; } -void sdbSetCurConfig(SSdb *pSdb, int64_t config) { pSdb->curConfig = config; } +void sdbSetCurConfig(SSdb *pSdb, int64_t config) { + mInfo("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->curConfig, config); + pSdb->curConfig = config; +} int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index b32abc3eaa..9a0afb51ea 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -432,8 +432,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { } else { pSdb->lastCommitVer = pSdb->curVer; pSdb->lastCommitTerm = pSdb->curTerm; - mDebug("write sdb file successfully, ver:%" PRId64 " term:%" PRId64 " file:%s", pSdb->lastCommitVer, - pSdb->lastCommitTerm, curfile); + mDebug("write sdb file successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", + pSdb->lastCommitVer, pSdb->lastCommitTerm, pSdb->curConfig, curfile); } terrno = code; From 514e86ff26dde68298747e3d49f79d6829f3397a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Jun 2022 11:28:59 +0800 Subject: [PATCH 17/30] fix(query): clear flag before assign data. --- source/libs/executor/src/executorimpl.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 69a61b413d..eda24cb0d6 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1856,12 +1856,13 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); - // it is a reserved column for scalar function, and no data in this column yet. if (pSrc->pData == NULL) { continue; } + colInfoDataCleanup(pDst, pBlock->info.rows); + int32_t numOfRows = 0; for (int32_t j = 0; j < totalRows; ++j) { if (rowRes[j] == 0) { From 7eac8e009db9b5a78a4c011653a34ee658b8ceaa Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Sat, 11 Jun 2022 11:35:33 +0800 Subject: [PATCH 18/30] update case abs for tag compute --- tests/system-test/2-query/abs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/2-query/abs.py b/tests/system-test/2-query/abs.py index a843feb827..d9c37be996 100644 --- a/tests/system-test/2-query/abs.py +++ b/tests/system-test/2-query/abs.py @@ -537,8 +537,8 @@ class TDTestCase: # tdSql.checkData(0,3,33333) # tag filter with abs function - tdSql.query("select t1 from stb1 where abs(t1)=1") - tdSql.checkRows(1) + # tdSql.query("select t1 from stb1 where abs(t1)=1") + # tdSql.checkRows(1) tdSql.query("select t1 from stb1 where abs(c1+t1)=1") tdSql.checkRows(1) # tdSql.query("select t1 from stb1 where abs(t1+c1)=1") From 79552ed65d0d7533c48f37ac6312733fc65f11e8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Jun 2022 11:35:42 +0800 Subject: [PATCH 19/30] fix(query): add null pointer check. --- source/common/src/tdatablock.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 38be0b8f0a..61b79bb1df 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1233,6 +1233,10 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); + if (pSrc->pData== NULL) { + continue; + } + int32_t code = colInfoDataEnsureCapacity(pDst, 0, pDataBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { return NULL; From f30ae95b9dcf1f8a6da1618e03424f95792e9326 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Jun 2022 11:36:13 +0800 Subject: [PATCH 20/30] fix(query): add null pointer check. --- source/common/src/tdatablock.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 61b79bb1df..04de2738e1 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1233,15 +1233,14 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - if (pSrc->pData== NULL) { - continue; - } - int32_t code = colInfoDataEnsureCapacity(pDst, 0, pDataBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { return NULL; } + if (pSrc->pData== NULL) { + continue; + } colDataAssign(pDst, pSrc, pDataBlock->info.rows); } From d702cb1486fa3353294a2ef0b6a34a7c800e921d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 11 Jun 2022 12:55:16 +0800 Subject: [PATCH 21/30] refactor: adjust mnode log --- source/dnode/mnode/impl/src/mndSync.c | 4 ++-- source/dnode/mnode/impl/src/mndTrans.c | 6 +++--- source/dnode/mnode/sdb/src/sdb.c | 6 ++++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index a0f722b3d2..adc86df829 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -200,14 +200,14 @@ int32_t mndInitSync(SMnode *pMnode) { return -1; } - mDebug("mnode sync is opened, id:%" PRId64, pMgmt->sync); + mDebug("mnode-sync is opened, id:%" PRId64, pMgmt->sync); return 0; } void mndCleanupSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; syncStop(pMgmt->sync); - mDebug("mnode sync is stopped, id:%" PRId64, pMgmt->sync); + mDebug("mnode-sync is stopped, id:%" PRId64, pMgmt->sync); tsem_destroy(&pMgmt->syncSem); memset(pMgmt, 0, sizeof(SSyncMgmt)); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index a689c89037..310f2fffbc 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -297,7 +297,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) action.pRaw = taosMemoryMalloc(dataLen); if (action.pRaw == NULL) goto _OVER; - mTrace("raw:%p, is created", pData); + // mTrace("raw:%p, is created", pData); SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; action.pRaw = NULL; @@ -330,7 +330,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) action.pRaw = taosMemoryMalloc(dataLen); if (action.pRaw == NULL) goto _OVER; - mTrace("raw:%p, is created", pData); + // mTrace("raw:%p, is created", action.pRaw); SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; action.pRaw = NULL; @@ -363,7 +363,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) action.pRaw = taosMemoryMalloc(dataLen); if (action.pRaw == NULL) goto _OVER; - mTrace("raw:%p, is created", action.pRaw); + // mTrace("raw:%p, is created", action.pRaw); SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER; action.pRaw = NULL; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 060bba0ce4..39e9c75888 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -164,8 +164,10 @@ void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; } void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; } void sdbSetCurConfig(SSdb *pSdb, int64_t config) { - mInfo("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->curConfig, config); - pSdb->curConfig = config; + if (pSdb->curConfig != config) { + mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->curConfig, config); + pSdb->curConfig = config; + } } int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; } From 3fc793dbe03e45614aec512388de00d0715b9256 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 11 Jun 2022 13:09:45 +0800 Subject: [PATCH 22/30] enh: save dnodeVer in dnodeEps.json --- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 12 +++++--- source/dnode/mgmt/node_util/src/dmEps.c | 31 ++++++++------------- source/dnode/mnode/impl/src/mndDnode.c | 4 +-- 3 files changed, 22 insertions(+), 25 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index fbd46db183..3e55469a4a 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -38,9 +38,13 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { SStatusRsp statusRsp = {0}; if (pRsp->pCont != NULL && pRsp->contLen > 0 && tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { - pMgmt->pData->dnodeVer = statusRsp.dnodeVer; - dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); - dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps); + dTrace("status msg received from mnode, dnodeVer:%" PRId64 " saved:%" PRId64, statusRsp.dnodeVer, + pMgmt->pData->dnodeVer); + if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) { + pMgmt->pData->dnodeVer = statusRsp.dnodeVer; + dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); + dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps); + } } rpcFreeCont(pRsp->pCont); tFreeSStatusRsp(&statusRsp); @@ -89,7 +93,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527}; SRpcMsg rpcRsp = {0}; - dTrace("send status msg to mnode"); + dTrace("send status msg to mnode, dnodeVer:%" PRId64, req.dnodeVer); SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index 6f6896ff9f..096fc753b2 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -81,6 +81,13 @@ int32_t dmReadEps(SDnodeData *pData) { } pData->dnodeId = dnodeId->valueint; + cJSON *dnodeVer = cJSON_GetObjectItem(root, "dnodeVer"); + if (!dnodeVer || dnodeVer->type != cJSON_String) { + dError("failed to read %s since dnodeVer not found", file); + goto _OVER; + } + pData->dnodeVer = atoll(dnodeVer->valuestring); + cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { dError("failed to read %s since clusterId not found", file); @@ -193,6 +200,7 @@ int32_t dmWriteEps(SDnodeData *pData) { len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pData->dnodeId); + len += snprintf(content + len, maxLen - len, " \"dnodeVer\": \"%" PRId64 "\",\n", pData->dnodeVer); len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pData->clusterId); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pData->dropped); len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); @@ -224,30 +232,15 @@ int32_t dmWriteEps(SDnodeData *pData) { } pData->updateTime = taosGetTimestampMs(); - dDebug("successed to write %s", realfile); + dDebug("successed to write %s, dnodeVer:%" PRId64, realfile, pData->dnodeVer); return 0; } void dmUpdateEps(SDnodeData *pData, SArray *eps) { - int32_t numOfEps = taosArrayGetSize(eps); - if (numOfEps <= 0) return; - taosThreadRwlockWrlock(&pData->lock); - - int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps); - if (numOfEps != numOfEpsOld) { - dDebug("new dnode list get from mnode"); - dmResetEps(pData, eps); - dmWriteEps(pData); - } else { - int32_t size = numOfEps * sizeof(SDnodeEp); - if (memcmp(pData->dnodeEps->pData, eps->pData, size) != 0) { - dDebug("new dnode list get from mnode"); - dmResetEps(pData, eps); - dmWriteEps(pData); - } - } - + dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer); + dmResetEps(pData, eps); + dmWriteEps(pData); taosThreadRwlockUnlock(&pData->lock); } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 3fab870277..73eea70195 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -385,7 +385,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pDnode, curMs); - bool dnodeChanged = (statusReq.dnodeVer != dnodeVer); + bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer); bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool needCheck = !online || dnodeChanged || reboot; @@ -427,7 +427,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { if (!online) { mInfo("dnode:%d, from offline to online", pDnode->id); } else { - mDebug("dnode:%d, send dnode epset, online:%d dnode_ver:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online, + mDebug("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online, statusReq.dnodeVer, dnodeVer, reboot); } From 80b7ba902ced89ecfca85dcffd38793e55040d47 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Jun 2022 13:48:18 +0800 Subject: [PATCH 23/30] fix(query): add null pointer check. --- source/common/src/tdatablock.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 04de2738e1..c1f2726629 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1238,7 +1238,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { return NULL; } - if (pSrc->pData== NULL) { + if (pSrc->pData== NULL || pDst->pData == NULL) { continue; } colDataAssign(pDst, pSrc, pDataBlock->info.rows); From 39af4cab123eeb6eba27d8ed08f10de1b9694dfa Mon Sep 17 00:00:00 2001 From: dingbo Date: Sat, 11 Jun 2022 13:42:15 +0800 Subject: [PATCH 24/30] docs: fix format error in rust.mdx --- docs-en/14-reference/03-connector/rust.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs-en/14-reference/03-connector/rust.mdx b/docs-en/14-reference/03-connector/rust.mdx index cd54f35982..d3b6fa648b 100644 --- a/docs-en/14-reference/03-connector/rust.mdx +++ b/docs-en/14-reference/03-connector/rust.mdx @@ -45,7 +45,7 @@ Add the [libtaos][libtaos] dependency to the [Rust](https://rust-lang.org) proje -Add [libtaos][libtaos] to the ``Cargo.toml`'' file. +Add [libtaos][libtaos] to the `Cargo.toml` file. ``toml [dependencies] @@ -53,7 +53,7 @@ Add [libtaos][libtaos] to the ``Cargo.toml`'' file. libtaos = "*" ``` - Add [libtaos][libtaos] to the `Cargo.toml` file and enable the `rest` feature. From 5f5ba9811a7b049a0f3550bf8126749098dbc4a9 Mon Sep 17 00:00:00 2001 From: dingbo Date: Sat, 11 Jun 2022 13:57:05 +0800 Subject: [PATCH 25/30] docs: fix format error in rust.mdx --- docs-en/14-reference/03-connector/rust.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs-en/14-reference/03-connector/rust.mdx b/docs-en/14-reference/03-connector/rust.mdx index d3b6fa648b..a5cbaeac80 100644 --- a/docs-en/14-reference/03-connector/rust.mdx +++ b/docs-en/14-reference/03-connector/rust.mdx @@ -47,7 +47,7 @@ Add the [libtaos][libtaos] dependency to the [Rust](https://rust-lang.org) proje Add [libtaos][libtaos] to the `Cargo.toml` file. -``toml +```toml [dependencies] # use default feature libtaos = "*" From 6f4f6c14fa8eede394218fd768378c81645540f7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 11 Jun 2022 14:02:48 +0800 Subject: [PATCH 26/30] enh(tmq): add log --- source/dnode/mnode/impl/src/mndConsumer.c | 7 +++++++ source/dnode/mnode/impl/src/mndOffset.c | 5 ++++- source/dnode/vnode/src/tq/tq.c | 12 +++++++++++- tests/system-test/fulltest.sh | 2 +- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7aabbf1e4f..4da3c906d7 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -447,6 +447,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerOld = mndAcquireConsumer(pMnode, consumerId); if (pConsumerOld == NULL) { + mInfo("receive subscribe request from new consumer: %ld", consumerId); + pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; @@ -463,7 +465,12 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } else { /*taosRLockLatch(&pConsumerOld->lock);*/ + int32_t status = atomic_load_32(&pConsumerOld->status); + + mInfo("receive subscribe request from old consumer: %ld, current status: %s", consumerId, + mndConsumerStatusName(status)); + if (status != MQ_CONSUMER_STATUS__READY) { terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; goto SUBSCRIBE_OVER; diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index 00c8bb30d0..18f2e993b2 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -183,7 +183,10 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) { for (int32_t i = 0; i < commitOffsetReq.num; i++) { SMqOffset *pOffset = &commitOffsetReq.offsets[i]; + mInfo("commit offset %ld to vg %d of consumer group %s on topic %s", pOffset->offset, pOffset->vgId, + pOffset->cgroup, pOffset->topicName); if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) { + mError("submit offset to topic %s failed", pOffset->topicName); return -1; } bool create = false; @@ -192,7 +195,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) { SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOffset->topicName); if (pTopic == NULL) { terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; - mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr()); + mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr()); continue; } pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj)); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2f4d6c11c6..e810bb9908 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -130,7 +130,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); - ASSERT(pHandle); + /*ASSERT(pHandle);*/ + if (pHandle == NULL) { + tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, pTq->pVnode->config.vgId, + pReq->subKey); + return -1; + } + if (pHandle->consumerId != consumerId) { + tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld", + consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId); + return -1; + } int32_t consumerEpoch = atomic_load_32(&pHandle->epoch); while (consumerEpoch < reqEpoch) { diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 4d8a2a66eb..abc8a81248 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -98,7 +98,7 @@ python3 ./test.py -f 2-query/statecount.py python3 ./test.py -f 7-tmq/basic5.py python3 ./test.py -f 7-tmq/subscribeDb.py python3 ./test.py -f 7-tmq/subscribeDb0.py -#python3 ./test.py -f 7-tmq/subscribeDb1.py +python3 ./test.py -f 7-tmq/subscribeDb1.py python3 ./test.py -f 7-tmq/subscribeStb.py python3 ./test.py -f 7-tmq/subscribeStb0.py python3 ./test.py -f 7-tmq/subscribeStb1.py From 94c8ca1e068bc8aa26baa70dce61c29be35fa15b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Jun 2022 14:10:00 +0800 Subject: [PATCH 27/30] fix(query): add null pointer check. --- source/common/src/tdatablock.c | 2 +- source/libs/executor/src/executorimpl.c | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c1f2726629..5b71578190 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1238,7 +1238,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { return NULL; } - if (pSrc->pData== NULL || pDst->pData == NULL) { + if (pSrc->pData == NULL) { continue; } colDataAssign(pDst, pSrc, pDataBlock->info.rows); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index eda24cb0d6..516824a446 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1857,7 +1857,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); // it is a reserved column for scalar function, and no data in this column yet. - if (pSrc->pData == NULL) { + if (pDst->pData == NULL) { continue; } @@ -1882,10 +1882,8 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR } else { ASSERT(pBlock->info.rows == numOfRows); } - - // write back -// colDataAssign(pSrc, pDst, pBlock->info.rows); } + blockDataDestroy(px); // fix memory leak } else { // do nothing From d210ef1c0eb9e64f63559833270b1395b61f37d3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Sat, 11 Jun 2022 14:27:45 +0800 Subject: [PATCH 28/30] feat(stream): stream max delay --- include/common/tcommon.h | 1 + include/common/tmsg.h | 1 + source/dnode/mnode/impl/src/mndScheduler.c | 20 ++--- source/libs/executor/src/timewindowoperator.c | 80 +++++++++++-------- 4 files changed, 59 insertions(+), 43 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 362f56c865..cc51f96f6c 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -50,6 +50,7 @@ typedef enum EStreamType { STREAM_INVERT, STREAM_REPROCESS, STREAM_INVALID, + STREAM_GET_ALL, } EStreamType; typedef struct { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d5946516e8..e051a43f21 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1496,6 +1496,7 @@ typedef struct { #define STREAM_TRIGGER_AT_ONCE 1 #define STREAM_TRIGGER_WINDOW_CLOSE 2 +#define STREAM_TRIGGER_MAX_DELAY 3 typedef struct { char name[TSDB_TABLE_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index deb7382183..d41928e909 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -167,17 +167,17 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* } } } - } else { - pTask->dispatchType = TASK_DISPATCH__FIXED; - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; - SArray* pArray = taosArrayGetP(pStream->tasks, 0); - // one sink only - ASSERT(taosArrayGetSize(pArray) == 1); - SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; - pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; - pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } + } else { + pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + SArray* pArray = taosArrayGetP(pStream->tasks, 0); + // one sink only + ASSERT(taosArrayGetSize(pArray) == 1); + SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); + pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; + pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; + pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } return 0; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 71e6a3ad09..ab595a3e34 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -750,14 +750,15 @@ int64_t getReskey(void* data, int32_t index) { return *(int64_t*)pos->key; } -static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated) { +static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, + SArray* pUpdated) { int32_t size = taosArrayGetSize(pUpdated); - int32_t index = binarySearch(pUpdated, size, result->win.skey, TSDB_ORDER_DESC, getReskey); + int32_t index = binarySearch(pUpdated, size, ts, TSDB_ORDER_DESC, getReskey); if (index == -1) { index = 0; } else { TSKEY resTs = getReskey(pUpdated, index); - if (resTs < result->win.skey) { + if (resTs < ts) { index++; } else { return TSDB_CODE_SUCCESS; @@ -769,14 +770,18 @@ static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated return TSDB_CODE_OUT_OF_MEMORY; } newPos->groupId = groupId; - newPos->pos = (SResultRowPosition){.pageId = result->pageId, .offset = result->offset}; - *(int64_t*)newPos->key = result->win.skey; + newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; + *(int64_t*)newPos->key = ts; if (taosArrayInsert(pUpdated, index, &newPos) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; } +static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { + return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated); +} + static void removeResult(SArray* pUpdated, TSKEY key) { int32_t size = taosArrayGetSize(pUpdated); int32_t index = binarySearch(pUpdated, size, key, TSDB_ORDER_DESC, getReskey); @@ -819,7 +824,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveResult(pResult, tableGroupId, pUpdated); + saveResultRow(pResult, tableGroupId, pUpdated); } } @@ -869,7 +874,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveResult(pResult, tableGroupId, pUpdated); + saveResultRow(pResult, tableGroupId, pUpdated); } } @@ -1243,6 +1248,23 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, SInterva } } +static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { + void* pIte = NULL; + size_t keyLen = 0; + while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { + void* key = taosHashGetKey(pIte, &keyLen); + uint64_t groupId = *(uint64_t*)key; + ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); + TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); + SResultRowPosition* pPos = (SResultRowPosition*)pIte; + int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, resWins); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + return TSDB_CODE_SUCCESS; +} + static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, SArray* closeWins) { void* pIte = NULL; @@ -1259,16 +1281,12 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); taosHashRemove(pHashMap, keyBuf, keyLen); - SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); - if (pos == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pos->groupId = groupId; - pos->pos = *(SResultRowPosition*)pIte; - *(int64_t*)pos->key = ts; - if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && !taosArrayPush(closeWins, &pos)) { - taosMemoryFree(pos); - return TSDB_CODE_OUT_OF_MEMORY; + SResultRowPosition* pPos = (SResultRowPosition*)pIte; + if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } @@ -1296,8 +1314,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); - SArray* pClosed = taosArrayInit(4, POINTER_BYTES); - while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -1316,19 +1332,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, pOperator->numOfExprs, pBlock, NULL); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; + } else if (pBlock->info.type == STREAM_GET_ALL && + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) { + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); + continue; } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated); } + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed); - finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset); - taosArrayAddAll(pUpdated, pClosed); - - taosArrayDestroy(pClosed); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); - initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); @@ -1898,7 +1913,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { - saveResult(pResult, tableGroupId, pUpdated); + saveResultRow(pResult, tableGroupId, pUpdated); } // window start(end) key interpolation // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, @@ -1993,7 +2008,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); - SArray* pClosed = taosArrayInit(4, POINTER_BYTES); if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2042,7 +2056,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); taosArrayDestroy(pUpWins); break; + } else if (pBlock->info.type == STREAM_GET_ALL && isFinalInterval(pInfo) && + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) { + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); + continue; } + if (isFinalInterval(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2064,13 +2083,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } if (isFinalInterval(pInfo)) { - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed); - finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset); - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - taosArrayAddAll(pUpdated, pClosed); - } + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); } - taosArrayDestroy(pClosed); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); From 0fbf30ce6d105fc7ba88a9faf1a09ea0b39b20d2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Jun 2022 14:51:54 +0800 Subject: [PATCH 29/30] refactor: do some internal refactor, and add some check before create exchange operator. --- source/client/src/clientImpl.c | 3 +-- source/client/test/clientTests.cpp | 4 ++-- source/common/src/tdatablock.c | 1 - source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 19 +++++++++++-------- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/scanoperator.c | 6 +++--- source/libs/executor/src/sortoperator.c | 9 +++++++-- source/libs/executor/src/tsort.c | 14 ++++++++++++-- 9 files changed, 38 insertions(+), 22 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 65a91fe426..dc83f7bc43 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -689,16 +689,15 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE}; SAppInstInfo* pAppInfo = getAppInfo(pRequest); - if (TSDB_CODE_SUCCESS == code) { code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList); - tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); } if (TSDB_CODE_SUCCESS == code) { schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest); } else { + tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); pRequest->body.queryFp(pRequest->body.param, pRequest, code); } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index f9825c50ff..dc15c0b13d 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -778,9 +778,9 @@ TEST(testCase, async_api_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - taos_query(pConn, "use test"); + taos_query(pConn, "use nest"); - TAOS_RES* pRes = taos_query(pConn, "desc abc1.tu"); + TAOS_RES* pRes = taos_query(pConn, "select NOW() from (select * from regular_table_2 where tbname in ('regular_table_2_1') and q_bigint <= 9223372036854775807 and q_tinyint <= 127 and q_bool in ( true , false) ) order by ts;"); if (taos_errno(pRes) != 0) { printf("failed, reason:%s\n", taos_errstr(pRes)); } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 38be0b8f0a..d8897c3b9c 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -322,7 +322,6 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p pColumnInfoData->varmeta.length = pSource->varmeta.length; } else { char* tmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(numOfRows)); - printf("----------------%d\n", BitmapLen(numOfRows)); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a8a95b513a..da681bd13b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -760,7 +760,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree); +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset); void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols); void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index eda24cb0d6..b1ad7a1cf2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1821,7 +1821,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree) { +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { if (pFilterNode == NULL) { return; } @@ -2829,7 +2829,6 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { return seqLoadRemoteData(pOperator); } else { return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); - // return concurrentlyLoadRemoteData(pOperator); } } @@ -2855,9 +2854,14 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { return TSDB_CODE_SUCCESS; } -static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo) { +static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) { size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints); + if (numOfSources == 0) { + qError("%s invalid number: %d of sources in exchange operator", id, (int32_t) numOfSources); + return TSDB_CODE_INVALID_PARA; + } + pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode)); pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { @@ -2879,7 +2883,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode goto _error; } - int32_t code = initExchangeOperator(pExNode, pInfo); + int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2908,7 +2912,7 @@ _error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = code; return NULL; } @@ -3677,7 +3681,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - doFilter(pProjectInfo->pFilterNode, pBlock, true); + doFilter(pProjectInfo->pFilterNode, pBlock); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); @@ -5189,7 +5193,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond); if (NULL == (*pTaskInfo)->pRoot) { - code = terrno; + code = (*pTaskInfo)->code; goto _complete; } @@ -5202,7 +5206,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead _complete: taosMemoryFreeClear(*pTaskInfo); - terrno = code; return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5965f5d7ad..132f93a6a5 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -359,7 +359,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { while(1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pRes, true); + doFilter(pInfo->pCondition, pRes); bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo); if (!hasRemain) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6c513a05b7..637534ad96 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -267,7 +267,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, false); + doFilter(pTableScanInfo->pFilterNode, pBlock); int64_t et = taosGetTimestampMs(); pTableScanInfo->readRecorder.filterTime += (et - st); @@ -939,7 +939,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes); } - doFilter(pInfo->pCondition, pInfo->pRes, false); + doFilter(pInfo->pCondition, pInfo->pRes); blockDataUpdateTsWindow(pInfo->pRes, 0); break; } @@ -1711,7 +1711,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { } pRes->info.rows = count; - doFilter(pInfo->pFilterNode, pRes, true); + doFilter(pInfo->pFilterNode, pRes); pOperator->resultInfo.totalRows += pRes->info.rows; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index dcaa95e28a..ff093741fa 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -296,7 +296,10 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { } SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, - SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) { + SArray* pColMatchInfo, SOperatorInfo* pOperator) { + SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + blockDataCleanup(pDataBlock); SSDataBlock* p = tsortGetSortedDataBlock(pHandle); @@ -354,6 +357,8 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData } blockDataDestroy(p); + + qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.rows); return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; } @@ -371,7 +376,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { } SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, - pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pInfo); + pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pOperator); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index ea8ded2c30..f63a9351c6 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -227,6 +227,8 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; pSource->src.pBlock = pHandle->fetchfp(pSource->param); + + // set current source id done if (pSource->src.pBlock == NULL) { pSource->src.rowIndex = -1; ++pHandle->numOfCompletedSources; @@ -426,8 +428,16 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages)); pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs; - qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64, - pHandle->idStr, (int32_t) (sortPass + 1), pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0, pHandle->sortElapsed, pHandle->totalElapsed); + + if (sortPass > 0) { + size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0; + qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu + ", sort elapsed:%" PRId64 ", total elapsed:%" PRId64, + pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed); + } else { + qDebug("%s ordered source:%"PRIzu", available buf:%d, no need internal sort", pHandle->idStr, numOfSources, + pHandle->numOfPages); + } int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize); blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); From 767b768e5137e29bc1c14a850d52abbe13b16cad Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 11 Jun 2022 15:03:23 +0800 Subject: [PATCH 30/30] fix(stream): fix shuffle vg id not initialized --- source/dnode/mnode/impl/src/mndScheduler.c | 54 ++++++++++++---------- source/libs/stream/src/streamDispatch.c | 6 ++- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index c04e416896..a4ddf96630 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -151,33 +151,36 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* ASSERT(pDb); if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { - sdbRelease(pMnode->pSdb, pDb); + ASSERT(0); + return -1; + } + sdbRelease(pMnode->pSdb, pDb); - SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t sz = taosArrayGetSize(pVgs); - SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); - int32_t sinkLvSize = taosArrayGetSize(sinkLv); - for (int32_t i = 0; i < sz; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); - for (int32_t j = 0; j < sinkLvSize; j++) { - SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); - if (pLastLevelTask->nodeId == pVgInfo->vgId) { - pVgInfo->taskId = pLastLevelTask->taskId; - break; - } + SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(pVgs); + SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); + int32_t sinkLvSize = taosArrayGetSize(sinkLv); + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); + for (int32_t j = 0; j < sinkLvSize; j++) { + SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); + if (pLastLevelTask->nodeId == pVgInfo->vgId) { + pVgInfo->taskId = pLastLevelTask->taskId; + ASSERT(pVgInfo->taskId != 0); + break; } } - } else { - pTask->dispatchType = TASK_DISPATCH__FIXED; - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; - SArray* pArray = taosArrayGetP(pStream->tasks, 0); - // one sink only - ASSERT(taosArrayGetSize(pArray) == 1); - SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; - pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; - pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } + } else { + pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + SArray* pArray = taosArrayGetP(pStream->tasks, 0); + // one sink only + ASSERT(taosArrayGetSize(pArray) == 1); + SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); + pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; + pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; + pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } return 0; } @@ -379,7 +382,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; // dispatch - mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask); + if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) { + qDestroyQueryPlan(pPlan); + return -1; + } // exec pFinalTask->execType = TASK_EXEC__PIPE; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 6fb5d75d86..d523374638 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -100,7 +100,6 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM .upstreamNodeId = pTask->nodeId, .blockNum = blockNum, }; - qInfo("dispatch from task %d (child id %d)", pTask->taskId, pTask->childId); req.data = taosArrayInit(blockNum, sizeof(void*)); req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); @@ -142,11 +141,14 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM break; } } - ASSERT(vgId != 0); } + ASSERT(vgId != 0); req.taskId = downstreamTaskId; + qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId, + downstreamTaskId, vgId); + // serialize int32_t tlen; tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);