From a72d87b3ff16e34ce0c469b71148b15f1cef7f95 Mon Sep 17 00:00:00 2001 From: charles Date: Mon, 6 Nov 2023 18:59:34 +0800 Subject: [PATCH 01/28] add non marterial test cases and update sql.error function by charles --- tests/parallel_test/cases.task | 1 + tests/pytest/util/sql.py | 4 +- .../view/non_marterial_view/test_view.py | 591 ++++++++++++++++++ 3 files changed, 594 insertions(+), 2 deletions(-) create mode 100644 tests/system-test/0-others/view/non_marterial_view/test_view.py diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 21dcd16441..22f6199ee9 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -225,6 +225,7 @@ e ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/ttlChangeOnWrite.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/compress_tsz1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/compress_tsz2.py +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/view/non_marterial_view/test_view.py ,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py ,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 7dcf6bc3f2..c05df0a852 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -88,7 +88,7 @@ class TDSql: expectErrNotOccured = False self.errno = e.errno error_info = repr(e) - self.error_info = error_info[error_info.index('(')+1:-1].split(",")[0].replace("'","") + self.error_info = ','.join(error_info[error_info.index('(')+1:-1].split(",")[:-1]).replace("'","") # self.error_info = (','.join(error_info.split(",")[:-1]).split("(",1)[1:][0]).replace("'","") if expectErrNotOccured: tdLog.exit("%s(%d) failed: sql:%s, expect error not occured" % (caller.filename, caller.lineno, sql)) @@ -106,7 +106,7 @@ class TDSql: tdLog.info("sql:%s, expect error occured" % (sql)) if expectErrInfo != None: - if expectErrInfo == self.error_info: + if expectErrInfo == self.error_info or expectErrInfo in self.error_info: tdLog.info("sql:%s, expected expectErrInfo %s occured" % (sql, expectErrInfo)) else: tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo %s occured, but not expected errno %s" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo)) diff --git a/tests/system-test/0-others/view/non_marterial_view/test_view.py b/tests/system-test/0-others/view/non_marterial_view/test_view.py new file mode 100644 index 0000000000..afb2476305 --- /dev/null +++ b/tests/system-test/0-others/view/non_marterial_view/test_view.py @@ -0,0 +1,591 @@ + +import taos +import os +import sys +import time +from pathlib import Path +sys.path.append(os.path.dirname(Path(__file__).resolve().parent.parent.parent) + "/7-tmq") + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.sqlset import * +from tmqCommon import * + +class TDTestCase: + """This test case is used to veirfy the tmq consume data from non marterial view + """ + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + + # db info + self.dbname = "view_db" + self.stbname = 'stb' + self.ctbname_list = ["ct1", "ct2"] + self.stable_column_dict = { + 'ts': 'timestamp', + 'col1': 'float', + 'col2': 'int', + } + self.tag_dict = { + 'ctbname': 'binary(10)' + } + + def prepare_data(self, conn=None): + """Create the db and data for test + """ + tdLog.debug("Start to prepare the data") + if not conn: + conn = tdSql + # create datebase + conn.execute(f"create database {self.dbname}") + conn.execute(f"use {self.dbname}") + time.sleep(2) + + # create stable + conn.execute(self.setsql.set_create_stable_sql(self.stbname, self.stable_column_dict, self.tag_dict)) + tdLog.debug("Create stable {} successfully".format(self.stbname)) + + # create child tables + for ctname in self.ctbname_list: + conn.execute(f"create table {ctname} using {self.stbname} tags('{ctname}');") + tdLog.debug("Create child table {} successfully".format(ctname)) + + # insert data into child tables + conn.execute(f"insert into {ctname} values(now, 1.1, 1)(now+1s, 2.2, 2)(now+2s, 3.3, 3)(now+3s, 4.4, 4)(now+4s, 5.5, 5)(now+5s, 6.6, 6)(now+6s, 7.7, 7)(now+7s, 8.8, 8)(now+8s, 9.9, 9)(now+9s, 10.1, 10);)") + tdLog.debug(f"Insert into data to {ctname} successfully") + + def prepare_tmq_data(self, para_dic): + tdLog.debug("Start to prepare the tmq data") + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, para_dic["dbName"], para_dic["dropFlag"], vgroups=para_dic["vgroups"], replica=1) + tdLog.info("create stb") + tdCom.create_stable(tdSql, dbname=para_dic["dbName"], stbname=para_dic["stbName"], column_elm_list=para_dic['colSchema'], tag_elm_list=para_dic['tagSchema']) + tdLog.info("create ctb") + tdCom.create_ctable(tdSql, dbname=para_dic["dbName"], stbname=para_dic["stbName"],tag_elm_list=para_dic['tagSchema'], count=para_dic["ctbNum"], default_ctbname_prefix=para_dic['ctbPrefix']) + tdLog.info("insert data") + tmqCom.insert_data(tdSql, para_dic["dbName"], para_dic["ctbPrefix"], para_dic["ctbNum"], para_dic["rowsPerTbl"], para_dic["batchNum"], para_dic["startTs"]) + tdLog.debug("Finish to prepare the tmq data") + + def check_view_num(self, num): + tdSql.query("show views;") + rows = tdSql.queryRows + assert(rows == num) + tdLog.debug(f"Verify the view number successfully") + + def create_user(self, username, password): + tdSql.execute(f"create user {username} pass '{password}';") + tdLog.debug("Create user {} with password {} successfully".format(username, password)) + + def check_permissions(self, username, db_name, permission_dict, view_name=None): + """ + :param permission_dict: {'db': ["read", "write], 'view': ["read", "write", "alter"]} + """ + tdSql.query("select * from information_schema.ins_user_privileges;") + for item in permission_dict.keys(): + if item == "db": + for permission in permission_dict[item]: + assert((username, permission, db_name, "", "", "") in tdSql.queryResult) + tdLog.debug(f"Verify the {item} {db_name} {permission} permission successfully") + elif item == "view": + for permission in permission_dict[item]: + assert((username, permission, db_name, view_name, "", "view") in tdSql.queryResult) + tdLog.debug(f"Verify the {item} {db_name} {view_name} {permission} permission successfully") + else: + raise Exception(f"Invalid permission type: {item}") + + def test_create_view_from_one_database(self): + """This test case is used to verify the create view from one database + """ + self.prepare_data() + tdSql.execute(f"create view v1 as select * from {self.stbname};") + self.check_view_num(1) + tdSql.error(f'create view v1 as select * from {self.stbname};', expectErrInfo='view already exists in db') + tdSql.error(f'create view db2.v2 as select * from {self.stbname};', expectErrInfo='Fail to get table info, error: Database not exist') + tdSql.error(f'create view v2 as select c2 from {self.stbname};', expectErrInfo='Invalid column name: c2') + tdSql.error(f'create view v2 as select ts, col1 from tt1;', expectErrInfo='Fail to get table info, error: Table does not exist') + + tdSql.execute(f"drop database {self.dbname}") + tdLog.debug("Finish test case 'test_create_view_from_one_database'") + + def test_create_view_from_multi_database(self): + """This test case is used to verify the create view from multi database + """ + self.prepare_data() + tdSql.execute(f"create view v1 as select * from view_db.{self.stbname};") + self.check_view_num(1) + + self.dbname = "view_db2" + self.prepare_data() + tdSql.execute(f"create view v1 as select * from view_db2.{self.stbname};") + tdSql.execute(f"create view v2 as select * from view_db.v1;") + self.check_view_num(2) + + self.dbname = "view_db" + tdSql.execute(f"drop database view_db;") + tdSql.execute(f"drop database view_db2;") + tdLog.debug("Finish test case 'test_create_view_from_multi_database'") + + def test_create_view_name_params(self): + """This test case is used to verify the create view with different view name params + """ + self.prepare_data() + tdSql.execute(f"create view v1 as select * from {self.stbname};") + self.check_view_num(1) + tdSql.error(f"create view v/2 as select * from {self.stbname};", expectErrInfo='syntax error near "/2 as select * from stb;"') + tdSql.execute(f"create view v2 as select ts, col1 from {self.stbname};") + self.check_view_num(2) + view_name_192_characters = "rzuoxoIXilAGgzNjYActiQwgzZK7PZYpDuaOe1lSJMFMVYXaexh1OfMmk3LvJcQbTeXXW7uGJY8IHuweHF73VHgoZgf0waO33YpZiTKfDQbdWtN4YmR2eWjL84ZtkfjM4huCP6lCysbDMj8YNwWksTdUq70LIyNhHp2V8HhhxyYSkREYFLJ1kOE78v61MQT6" + tdSql.execute(f"create view {view_name_192_characters} as select * from {self.stbname};") + self.check_view_num(3) + tdSql.error(f"create view {view_name_192_characters}1 as select * from {self.stbname};", expectErrInfo='Invalid identifier name: rzuoxoixilaggznjyactiqwgzzk7pzypduaoe1lsjmfmvyxaexh1ofmmk3lvjcqbtexxw7ugjy8ihuwehf73vhgozgf0wao33ypzitkfdqbdwtn4ymr2ewjl84ztkfjm4hucp6lcysbdmj8ynwwkstduq70liynhhp2v8hhhxyyskreyflj1koe78v61mqt61 as select * from stb;') + tdSql.execute(f"drop database {self.dbname}") + tdLog.debug("Finish test case 'test_create_view_name_params'") + + def test_create_view_query(self): + """This test case is used to verify the create view with different data type in query + """ + self.prepare_data() + # add different data type table + tdSql.execute(f"create table tb (ts timestamp, c1 int, c2 int unsigned, c3 bigint, c4 bigint unsigned, c5 float, c6 double, c7 binary(16), c8 smallint, c9 smallint unsigned, c10 tinyint, c11 tinyint unsigned, c12 bool, c13 varchar(16), c14 nchar(8), c15 geometry(21), c16 varbinary(16));") + tdSql.execute(f"create view v1 as select ts, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16 from tb;") + # check data type in create view sql + tdSql.query("desc v1;") + res = tdSql.queryResult + data_type_list = [res[index][1] for index in range(len(res))] + tdLog.debug(data_type_list) + assert('TIMESTAMP' in data_type_list and 'INT' in data_type_list and 'INT UNSIGNED' in data_type_list and 'BIGINT' in data_type_list and 'BIGINT UNSIGNED' in data_type_list and 'FLOAT' in data_type_list and 'DOUBLE' in data_type_list and 'VARCHAR' in data_type_list and 'SMALLINT' in data_type_list and 'SMALLINT UNSIGNED' in data_type_list and 'TINYINT' in data_type_list and 'TINYINT UNSIGNED' in data_type_list and 'BOOL' in data_type_list and 'VARCHAR' in data_type_list and 'NCHAR' in data_type_list and 'GEOMETRY' in data_type_list and 'VARBINARY' in data_type_list) + tdSql.execute("create view v2 as select * from tb where c1 >5 and c7 like '%ab%';") + self.check_view_num(2) + tdSql.error("create view v3 as select * from tb where c1 like '%ab%';", expectErrInfo='Invalid value type') + tdSql.execute("create view v3 as select first(ts), sum(c1) from tb group by c2 having avg(c4) > 0;") + tdSql.execute("create view v4 as select _wstart,sum(c6) from tb interval(10s);") + tdSql.execute("create view v5 as select * from tb join v2 on tb.ts = v2.ts;") + tdSql.execute("create view v6 as select * from (select ts, c1, c2 from (select * from v2));") + self.check_view_num(6) + for v in ['v1', 'v2', 'v3', 'v4', 'v5', 'v6']: + tdSql.execute(f"drop view {v};") + tdSql.execute(f"drop database {self.dbname}") + tdLog.debug("Finish test case 'test_create_view_query'") + + def test_show_view(self): + """This test case is used to verify the show view + """ + self.prepare_data() + tdSql.execute(f"create view v1 as select * from {self.ctbname_list[0]};") + + # query from show sql + tdSql.query("show views;") + res = tdSql.queryResult + assert(res[0][0] == 'v1' and res[0][1] == 'view_db' and res[0][2] == 'root' and res[0][4] == 'NORMAL' and res[0][5] == 'select * from ct1;') + + # show create sql + tdSql.query("show create view v1;") + res = tdSql.queryResult + assert(res[0][1] == 'CREATE VIEW `view_db`.`v1` AS select * from ct1;') + + # query from desc results + tdSql.query("desc view_db.v1;") + res = tdSql.queryResult + assert(res[0][1] == 'TIMESTAMP' and res[1][1] == 'FLOAT' and res[2][1] == 'INT') + + # query from system table + tdSql.query("select * from information_schema.ins_views;") + res = tdSql.queryResult + assert(res[0][0] == 'v1' and res[0][1] == 'view_db' and res[0][2] == 'root' and res[0][4] == 'NORMAL' and res[0][5] == 'select * from ct1;') + tdSql.error("show db3.views;", expectErrInfo='Database not exist') + tdSql.error("desc viewx;", expectErrInfo='Table does not exist') + tdSql.error(f"show create view {self.dbname}.viewx;", expectErrInfo='view not exists in db') + tdSql.execute(f"drop database {self.dbname}") + tdSql.error("show views;", expectErrInfo='Database not exist') + tdLog.debug("Finish test case 'test_show_view'") + + def test_drop_view(self): + """This test case is used to verify the drop view + """ + self.prepare_data() + self.dbname = "view_db2" + self.prepare_data() + tdSql.execute("create view view_db.v1 as select * from view_db.stb;") + tdSql.execute("create view view_db2.v1 as select * from view_db2.stb;") + # delete view without database name + tdSql.execute("drop view v1;") + # delete view with database name + tdSql.execute("drop view view_db.v1;") + # delete non exist view + tdSql.error("drop view view_db.v11;", expectErrInfo='view not exists in db') + tdSql.execute("drop database view_db") + tdSql.execute("drop database view_db2;") + self.dbname = "view_db" + tdLog.debug("Finish test case 'test_drop_view'") + + def test_view_permission_db_all_view_all(self): + """This test case is used to verify the view permission with db all and view all, + the time sleep to wait the permission take effect + """ + self.prepare_data() + username = "view_test" + password = "test" + self.create_user(username, password) + # grant all db permission to user + tdSql.execute("grant all on view_db.* to view_test;") + + conn = taos.connect(user=username, password=password) + conn.execute(f"use {self.dbname};") + conn.execute("create view v1 as select * from stb;") + res = conn.query("show views;") + assert(len(res.fetch_all()) == 1) + tdLog.debug(f"Verify the show view permission of user '{username}' with db all and view all successfully") + self.check_permissions("view_test", "view_db", {"db": ["read", "write"], "view": ["read", "write", "alter"]}, "v1") + tdLog.debug(f"Verify the view permission from system table successfully") + time.sleep(2) + conn.execute("drop view v1;") + tdSql.execute("revoke all on view_db.* from view_test;") + tdSql.execute(f"drop database {self.dbname};") + time.sleep(1) + + # prepare data by user 'view_test' + self.prepare_data(conn) + + conn.execute("create view v1 as select * from stb;") + res = conn.query("show views;") + assert(len(res.fetch_all()) == 1) + tdLog.debug(f"Verify the view permission of user '{username}' with db all and view all successfully") + self.check_permissions("view_test", "view_db", {"db": ["read", "write"], "view": ["read", "write", "alter"]}, "v1") + tdLog.debug(f"Verify the view permission from system table successfully") + time.sleep(2) + conn.execute("drop view v1;") + tdSql.execute("revoke all on view_db.* from view_test;") + tdSql.execute("revoke all on view_db.v1 from view_test;") + tdSql.execute(f"drop database {self.dbname}") + tdSql.execute("drop user view_test;") + tdLog.debug("Finish test case 'test_view_permission_db_all_view_all'") + + def test_view_permission_db_write_view_all(self): + """This test case is used to verify the view permission with db write and view all + """ + username = "view_test" + password = "test" + self.create_user(username, password) + conn = taos.connect(user=username, password=password) + self.prepare_data(conn) + conn.execute("create view v1 as select * from stb;") + tdSql.execute("revoke read on view_db.* from view_test;") + self.check_permissions("view_test", "view_db", {"db": ["write"], "view": ["read", "write", "alter"]}, "v1") + # create view permission error + try: + conn.execute("create view v2 as select * from v1;") + except Exception as ex: + assert("[0x2644]: Permission denied or target object not exist" in str(ex)) + # query from view permission error + try: + conn.query("select * from v1;") + except Exception as ex: + assert("[0x2644]: Permission denied or target object not exist" in str(ex)) + # view query permission + res = conn.query("show views;") + assert(len(res.fetch_all()) == 1) + time.sleep(2) + conn.execute("drop view v1;") + tdSql.execute("revoke write on view_db.* from view_test;") + tdSql.execute(f"drop database {self.dbname}") + tdSql.execute("drop user view_test;") + tdLog.debug("Finish test case 'test_view_permission_db_write_view_all'") + + def test_view_permission_db_write_view_read(self): + """This test case is used to verify the view permission with db write and view read + """ + username = "view_test" + password = "test" + self.create_user(username, password) + conn = taos.connect(user=username, password=password) + self.prepare_data() + + tdSql.execute("create view v1 as select * from stb;") + tdSql.execute("grant write on view_db.* to view_test;") + tdSql.execute("grant read on view_db.v1 to view_test;") + + conn.execute(f"use {self.dbname};") + time.sleep(2) + res = conn.query("select * from v1;") + assert(len(res.fetch_all()) == 20) + + conn.execute("create view v2 as select * from v1;") + # create view from super table of database + try: + conn.execute("create view v3 as select * from stb;") + except Exception as ex: + assert("[0x2644]: Permission denied or target object not exist" in str(ex)) + time.sleep(2) + conn.execute("drop view v2;") + try: + conn.execute("drop view v1;") + except Exception as ex: + assert("[0x2644]: Permission denied or target object not exist" in str(ex)) + tdSql.execute("revoke read on view_db.v1 from view_test;") + tdSql.execute("revoke write on view_db.* from view_test;") + tdSql.execute(f"drop database {self.dbname}") + tdSql.execute("drop user view_test;") + tdLog.debug("Finish test case 'test_view_permission_db_write_view_read'") + + def test_view_permission_db_write_view_alter(self): + """This test case is used to verify the view permission with db write and view alter + """ + username = "view_test" + password = "test" + self.create_user(username, password) + conn = taos.connect(user=username, password=password) + self.prepare_data() + + tdSql.execute("create view v1 as select * from stb;") + tdSql.execute("grant write on view_db.* to view_test;") + tdSql.execute("grant alter on view_db.v1 to view_test;") + try: + conn.execute(f"use {self.dbname};") + conn.execute("select * from v1;") + except Exception as ex: + assert("[0x2644]: Permission denied or target object not exist" in str(ex)) + time.sleep(2) + conn.execute("drop view v1;") + tdSql.execute("revoke write on view_db.* from view_test;") + tdSql.execute(f"drop database {self.dbname}") + tdSql.execute("drop user view_test;") + tdLog.debug("Finish test case 'test_view_permission_db_write_view_alter'") + + def test_view_permission_db_read_view_all(self): + """This test case is used to verify the view permission with db read and view all + """ + username = "view_test" + password = "test" + self.create_user(username, password) + conn = taos.connect(user=username, password=password) + self.prepare_data() + + tdSql.execute("create view v1 as select * from stb;") + tdSql.execute("grant read on view_db.* to view_test;") + tdSql.execute("grant all on view_db.v1 to view_test;") + try: + conn.execute(f"use {self.dbname};") + conn.execute("create view v2 as select * from v1;") + except Exception as ex: + assert("[0x2644]: Permission denied or target object not exist" in str(ex)) + time.sleep(2) + res = conn.query("select * from v1;") + assert(len(res.fetch_all()) == 20) + conn.execute("drop view v1;") + tdSql.execute("revoke read on view_db.* from view_test;") + tdSql.execute(f"drop database {self.dbname}") + tdSql.execute("drop user view_test;") + tdLog.debug("Finish test case 'test_view_permission_db_read_view_all'") + + def test_view_permission_db_read_view_alter(self): + """This test case is used to verify the view permission with db read and view alter + """ + username = "view_test" + password = "test" + self.create_user(username, password) + conn = taos.connect(user=username, password=password) + self.prepare_data() + + tdSql.execute("create view v1 as select * from stb;") + tdSql.execute("grant read on view_db.* to view_test;") + tdSql.execute("grant alter on view_db.v1 to view_test;") + try: + conn.execute(f"use {self.dbname};") + conn.execute("select * from v1;") + except Exception as ex: + assert("[0x2644]: Permission denied or target object not exist" in str(ex)) + + time.sleep(2) + conn.execute("drop view v1;") + tdSql.execute("revoke read on view_db.* from view_test;") + tdSql.execute(f"drop database {self.dbname}") + tdSql.execute("drop user view_test;") + tdLog.debug("Finish test case 'test_view_permission_db_read_view_alter'") + + def test_view_permission_db_read_view_read(self): + """This test case is used to verify the view permission with db read and view read + """ + username = "view_test" + password = "test" + self.create_user(username, password) + conn = taos.connect(user=username, password=password) + self.prepare_data() + + tdSql.execute("create view v1 as select * from stb;") + tdSql.execute("grant read on view_db.* to view_test;") + tdSql.execute("grant read on view_db.v1 to view_test;") + conn.execute(f"use {self.dbname};") + time.sleep(2) + res = conn.query("select * from v1;") + assert(len(res.fetch_all()) == 20) + try: + conn.execute("drop view v1;") + except Exception as ex: + assert("[0x2644]: Permission denied or target object not exist" in str(ex)) + tdSql.execute("revoke read on view_db.* from view_test;") + tdSql.execute("revoke read on view_db.v1 from view_test;") + tdSql.execute(f"drop database {self.dbname}") + tdSql.execute("drop user view_test;") + tdLog.debug("Finish test case 'test_view_permission_db_read_view_read'") + + def test_query_from_view(self): + """This test case is used to verify the query from view + """ + self.prepare_data() + view_name_list = [] + + # common query from super table + tdSql.execute(f"create view v1 as select * from {self.stbname};") + tdSql.query(f"select * from v1;") + rows = tdSql.queryRows + assert(rows == 20) + view_name_list.append("v1") + tdLog.debug("Verify the query from super table successfully") + + # common query from child table + tdSql.execute(f"create view v2 as select * from {self.ctbname_list[0]};") + tdSql.query(f"select * from v2;") + rows = tdSql.queryRows + assert(rows == 10) + view_name_list.append("v2") + tdLog.debug("Verify the query from child table successfully") + + # join query + tdSql.execute(f"create view v3 as select * from {self.stbname} join {self.ctbname_list[1]} on {self.ctbname_list[1]}.ts = {self.stbname}.ts;") + tdSql.query(f"select * from v3;") + rows = tdSql.queryRows + assert(rows == 10) + view_name_list.append("v3") + tdLog.debug("Verify the join query successfully") + + # group by query + tdSql.execute(f"create view v4 as select count(*) from {self.stbname} group by tbname;") + tdSql.query(f"select * from v4;") + rows = tdSql.queryRows + assert(rows == 2) + res = tdSql.queryResult + assert(res[0][0] == 10) + view_name_list.append("v4") + tdLog.debug("Verify the group by query successfully") + + # partition by query + tdSql.execute(f"create view v5 as select sum(col1) from {self.stbname} where col2 > 4 partition by tbname interval(3s);") + tdSql.query(f"select * from v5;") + rows = tdSql.queryRows + assert(rows >= 4) + view_name_list.append("v5") + tdLog.debug("Verify the partition by query successfully") + + # query from nested view + tdSql.execute(f"create view v6 as select * from v5;") + tdSql.query(f"select * from v6;") + rows = tdSql.queryRows + assert(rows >= 4) + view_name_list.append("v6") + tdLog.debug("Verify the query from nested view successfully") + + # delete view + for view in view_name_list: + tdSql.execute(f"drop view {view};") + tdLog.debug(f"Drop view {view} successfully") + tdSql.execute(f"drop database {self.dbname}") + tdLog.debug("Finish test case 'test_query_from_view'") + + def test_tmq_from_view(self): + """This test case is used to verify the tmq consume data from view + """ + # params for db + paraDict = {'dbName': 'view_db', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1} + # topic info + topic_name_list = ['topic1'] + view_name_list = ['view1'] + expectRowsList = [] + + self.prepare_tmq_data(paraDict) + + # init consume info, and start tmq_sim, then check consume result + tmqCom.initConsumerTable() + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + tdSql.execute(f"create view {view_name_list[0]} as {queryString}") + sqlString = "create topic %s as %s" %(topic_name_list[0], "select * from %s"%view_name_list[0]) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) + + consumerId = 1 + topicList = topic_name_list[0] + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest' + ifcheckdata = 1 + ifManualCommit = 1 + tmqCom.insertConsumerInfo(consumerId, expectrowcnt, topicList, keyList, ifcheckdata, ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(paraDict['pollDelay'], paraDict["dbName"], paraDict['showMsg'], paraDict['showRow']) + + tdLog.info("wait the consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + if expectRowsList[0] != resultList[0]: + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) + tdLog.exit("1 tmq consume rows error!") + + tmqCom.checkFileContent(consumerId, queryString) + + time.sleep(10) + for i in range(len(topic_name_list)): + tdSql.query("drop topic %s"%topic_name_list[i]) + for i in range(len(view_name_list)): + tdSql.query("drop view %s"%view_name_list[i]) + + # drop database + tdSql.execute(f"drop database {paraDict['dbName']}") + tdSql.execute("drop database cdb;") + tdLog.debug("Finish test case 'test_tmq_from_view'") + + def run(self): + self.test_create_view_from_one_database() + self.test_create_view_from_multi_database() + self.test_create_view_name_params() + self.test_create_view_query() + self.test_show_view() + self.test_drop_view() + self.test_view_permission_db_all_view_all() + self.test_view_permission_db_write_view_all() + self.test_view_permission_db_write_view_read() + self.test_view_permission_db_write_view_alter() + self.test_view_permission_db_read_view_all() + self.test_view_permission_db_read_view_alter() + self.test_view_permission_db_read_view_read() + self.test_query_from_view() + self.test_tmq_from_view() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) From c1f2f0bb630cd319d6f05339711272325f5bffe8 Mon Sep 17 00:00:00 2001 From: charles Date: Tue, 7 Nov 2023 18:13:23 +0800 Subject: [PATCH 02/28] add test case for ts-4219 by charles --- tests/parallel_test/cases.task | 1 + tests/system-test/1-insert/test_ts4219.py | 27 +++++++++++++++++++++++ 2 files changed, 28 insertions(+) create mode 100644 tests/system-test/1-insert/test_ts4219.py diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 22f6199ee9..082aa13ddb 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -283,6 +283,7 @@ e ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/rowlength64k_4.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionUS.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionNS.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4219.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/information_schema.py diff --git a/tests/system-test/1-insert/test_ts4219.py b/tests/system-test/1-insert/test_ts4219.py new file mode 100644 index 0000000000..e6447d77ae --- /dev/null +++ b/tests/system-test/1-insert/test_ts4219.py @@ -0,0 +1,27 @@ +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from math import inf + + +class TDTestCase: + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + + def prepare_data(self): + tdSql.execute("create database db;") + tdSql.execute("use db;") + tdSql.execute("create stable st(ts timestamp, c1 int, c2 float) tags(groupname binary(32));") + + def run(self): + tdSql.error("insert into ct1 using st tags('group name 1') values(now, 1, 1.1)(now+1s, 2, 2.2) ct1 using st tags('group 1) values(now+2s, 3, 3.3); ") + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) From 764f8d2e456a00357dae70ea846dc3926f8cbf9c Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 09:03:19 +0800 Subject: [PATCH 03/28] fix(vnode/s3): move init & cleanup to dnode --- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 36 +++++++++++++------------ source/dnode/vnode/src/vnd/vnodeCos.c | 10 ++++--- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index d560ba1644..f79c9d97b8 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -18,17 +18,17 @@ #include "audit.h" #include "libs/function/tudf.h" -#define DM_INIT_AUDIT() \ - do { \ - auditCfg.port = tsMonitorPort; \ - auditCfg.server = tsMonitorFqdn; \ - auditCfg.comp = tsMonitorComp; \ - if (auditInit(&auditCfg) != 0) { \ - return -1; \ - } \ +#define DM_INIT_AUDIT() \ + do { \ + auditCfg.port = tsMonitorPort; \ + auditCfg.server = tsMonitorFqdn; \ + auditCfg.comp = tsMonitorComp; \ + if (auditInit(&auditCfg) != 0) { \ + return -1; \ + } \ } while (0) -static SDnode globalDnode = {0}; +static SDnode globalDnode = {0}; SDnode *dmInstance() { return &globalDnode; } @@ -146,6 +146,9 @@ static bool dmCheckDataDirVersion() { return true; } +extern int32_t s3Begin(); +extern void s3End(); + int32_t dmInit() { dInfo("start to init dnode env"); if (dmDiskInit() != 0) return -1; @@ -156,6 +159,7 @@ int32_t dmInit() { if (dmInitMonitor() != 0) return -1; if (dmInitAudit() != 0) return -1; if (dmInitDnode(dmInstance()) != 0) return -1; + if (s3Begin() != 0) return -1; dInfo("dnode env is initialized"); return 0; @@ -181,6 +185,7 @@ void dmCleanup() { udfStopUdfd(); taosStopCacheRefreshWorker(); dmDiskClose(); + s3End(); dInfo("dnode env is cleaned up"); taosCleanupCfg(); @@ -265,19 +270,19 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { pWrapper = &pDnode->wrappers[ntype]; - if(pWrapper->func.nodeRoleFp != NULL){ + if (pWrapper->func.nodeRoleFp != NULL) { ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt); dInfo("node:%s, checking node role:%d", pWrapper->name, role); - if(role == TAOS_SYNC_ROLE_VOTER){ + if (role == TAOS_SYNC_ROLE_VOTER) { dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role); terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER; return -1; } } - if(pWrapper->func.isCatchUpFp != NULL){ + if (pWrapper->func.isCatchUpFp != NULL) { dInfo("node:%s, checking node catch up", pWrapper->name); - if((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1){ + if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) { terrno = TSDB_CODE_MNODE_NOT_CATCH_UP; return -1; } @@ -394,7 +399,4 @@ void dmReportStartup(const char *pName, const char *pDesc) { dDebug("step:%s, %s", pStartup->name, pStartup->desc); } -int64_t dmGetClusterId() { - return globalDnode.data.clusterId; -} - +int64_t dmGetClusterId() { return globalDnode.data.clusterId; } diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 6e36739f5a..a16f926f0f 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -24,7 +24,7 @@ static S3UriStyle uriStyleG = S3UriStylePath; static int retriesG = 5; static int timeoutMsG = 0; -static int32_t s3Begin() { +int32_t s3Begin() { S3Status status; const char *hostname = tsS3Hostname; const char *env_hn = getenv("S3_HOSTNAME"); @@ -43,10 +43,12 @@ static int32_t s3Begin() { return 0; } -static void s3End() { S3_deinitialize(); } -int32_t s3Init() { return s3Begin(); } +void s3End() { S3_deinitialize(); } -void s3CleanUp() { s3End(); } +int32_t s3Init() { return 0; /*s3Begin();*/ } + +void s3CleanUp() { /*s3End();*/ +} static int should_retry() { /* From 8bb65119a4c77f4ab9d309a7bbe9bbe51e0e7dfe Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 10:52:23 +0800 Subject: [PATCH 04/28] dnode: fix dnode s3 init --- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index f79c9d97b8..6f13abcebc 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -146,9 +146,13 @@ static bool dmCheckDataDirVersion() { return true; } +#if defined(USE_S3) + extern int32_t s3Begin(); extern void s3End(); +#endif + int32_t dmInit() { dInfo("start to init dnode env"); if (dmDiskInit() != 0) return -1; @@ -159,7 +163,9 @@ int32_t dmInit() { if (dmInitMonitor() != 0) return -1; if (dmInitAudit() != 0) return -1; if (dmInitDnode(dmInstance()) != 0) return -1; +#if defined(USE_S3) if (s3Begin() != 0) return -1; +#endif dInfo("dnode env is initialized"); return 0; @@ -185,7 +191,9 @@ void dmCleanup() { udfStopUdfd(); taosStopCacheRefreshWorker(); dmDiskClose(); +#if defined(USE_S3) s3End(); +#endif dInfo("dnode env is cleaned up"); taosCleanupCfg(); From 15e1e4cd2c1ccc2729651f7a492616d57469641a Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 14:00:10 +0800 Subject: [PATCH 05/28] dnode/node_mgmt: cmake define for use_s3 --- source/dnode/mgmt/node_mgmt/CMakeLists.txt | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/source/dnode/mgmt/node_mgmt/CMakeLists.txt b/source/dnode/mgmt/node_mgmt/CMakeLists.txt index f1be20289a..6b875db860 100644 --- a/source/dnode/mgmt/node_mgmt/CMakeLists.txt +++ b/source/dnode/mgmt/node_mgmt/CMakeLists.txt @@ -3,6 +3,17 @@ add_library(dnode STATIC ${IMPLEMENT_SRC}) target_link_libraries( dnode mgmt_mnode mgmt_qnode mgmt_snode mgmt_vnode mgmt_dnode ) + +IF (TD_STORAGE) + + IF(${BUILD_WITH_S3}) + add_definitions(-DUSE_S3) + ELSEIF(${BUILD_WITH_COS}) + add_definitions(-DUSE_COS) + ENDIF() + +ENDIF () + target_include_directories( dnode PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" From 3f125bc6bf45c878a41bebdaa79251a7c282b2ac Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 14:30:30 +0800 Subject: [PATCH 06/28] vnode/cos: make err msg buffer big enough for detailed msg --- source/dnode/vnode/src/vnd/vnodeCos.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index a16f926f0f..d2db8be026 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -74,7 +74,7 @@ static void s3PrintError(const char *func, S3Status status, char error_details[] } typedef struct { - char err_msg[128]; + char err_msg[512]; S3Status status; uint64_t content_length; char *buf; @@ -203,7 +203,7 @@ static void growbuffer_destroy(growbuffer *gb) { } typedef struct put_object_callback_data { - char err_msg[128]; + char err_msg[512]; S3Status status; // FILE *infile; TdFilePtr infileFD; @@ -216,7 +216,7 @@ typedef struct put_object_callback_data { #define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M typedef struct UploadManager { - char err_msg[128]; + char err_msg[512]; S3Status status; // used for initial multipart char *upload_id; @@ -231,7 +231,7 @@ typedef struct UploadManager { } UploadManager; typedef struct list_parts_callback_data { - char err_msg[128]; + char err_msg[512]; S3Status status; int isTruncated; char nextPartNumberMarker[24]; @@ -248,7 +248,7 @@ typedef struct list_parts_callback_data { } list_parts_callback_data; typedef struct MultipartPartData { - char err_msg[128]; + char err_msg[512]; S3Status status; put_object_callback_data put_object_data; int seq; @@ -611,7 +611,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { } typedef struct list_bucket_callback_data { - char err_msg[128]; + char err_msg[512]; S3Status status; int isTruncated; char nextMarker[1024]; From a1e692a796b64d0d3585cbade05f528b69907084 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 15:18:55 +0800 Subject: [PATCH 07/28] fix(vnode/cos): fix error printing to avoid buffer overflow --- source/dnode/vnode/src/vnd/vnodeCos.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index d2db8be026..d7eced78fb 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -99,20 +99,22 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro int len = 0; const int elen = sizeof(cbd->err_msg); if (error) { - if (error->message) { + if (error->message && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, " Message: %s\n", error->message); } - if (error->resource) { + if (error->resource && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, " Resource: %s\n", error->resource); } - if (error->furtherDetails) { + if (error->furtherDetails && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, " Further Details: %s\n", error->furtherDetails); } - if (error->extraDetailsCount) { + if (error->extraDetailsCount && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, "%s", " Extra Details:\n"); for (int i = 0; i < error->extraDetailsCount; i++) { - len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name, - error->extraDetails[i].value); + if (elen - len > 0) { + len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name, + error->extraDetails[i].value); + } } } } @@ -205,6 +207,7 @@ static void growbuffer_destroy(growbuffer *gb) { typedef struct put_object_callback_data { char err_msg[512]; S3Status status; + uint64_t content_length; // FILE *infile; TdFilePtr infileFD; growbuffer *gb; @@ -218,6 +221,7 @@ typedef struct put_object_callback_data { typedef struct UploadManager { char err_msg[512]; S3Status status; + uint64_t content_length; // used for initial multipart char *upload_id; @@ -233,6 +237,7 @@ typedef struct UploadManager { typedef struct list_parts_callback_data { char err_msg[512]; S3Status status; + uint64_t content_length; int isTruncated; char nextPartNumberMarker[24]; char initiatorId[256]; From e5bbcf76f1843a68fe478d5ecf22d80bab16f1f1 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 15:57:56 +0800 Subject: [PATCH 08/28] vnode/cos: fix get object block callback --- source/dnode/vnode/src/vnd/vnodeCos.c | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index d7eced78fb..f9d6ffe544 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -78,6 +78,7 @@ typedef struct { S3Status status; uint64_t content_length; char *buf; + int64_t buf_pos; } TS3SizeCBD; static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) { @@ -730,15 +731,19 @@ void s3DeleteObjects(const char *object_name[], int nobject) { static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) { TS3SizeCBD *cbd = callbackData; + /* if (cbd->content_length != bufferSize) { cbd->status = S3StatusAbortedByCallback; return S3StatusAbortedByCallback; } + */ + if (!cbd->buf) { + cbd->buf = taosMemoryCalloc(1, bufferSize); + } - char *buf = taosMemoryCalloc(1, bufferSize); - if (buf) { - memcpy(buf, buffer, bufferSize); - cbd->buf = buf; + if (cbd->buf) { + memcpy(cbd->buf + cbd->buf_pos, buffer, bufferSize); + cbd->buf_pos += bufferSize; cbd->status = S3StatusOK; return S3StatusOK; } else { @@ -760,6 +765,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, TS3SizeCBD cbd = {0}; cbd.content_length = size; + cbd.buf_pos = 0; do { S3_get_object(&bucketContext, object_name, &getConditions, offset, size, 0, 0, &getObjectHandler, &cbd); } while (S3_status_is_retryable(cbd.status) && should_retry()); From 128353a861e0a18a116ff6ab44738d3716101a80 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 16:04:24 +0800 Subject: [PATCH 09/28] vnode/cos: fix get object handler mem --- source/dnode/vnode/src/vnd/vnodeCos.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index f9d6ffe544..0bb16fcd9c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -738,7 +738,7 @@ static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void * } */ if (!cbd->buf) { - cbd->buf = taosMemoryCalloc(1, bufferSize); + cbd->buf = taosMemoryCalloc(1, cbd->content_length); } if (cbd->buf) { From 5ccdde4495df36350ec342236c1755da8603b16c Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 20:43:52 +0800 Subject: [PATCH 10/28] vnode/cos: error on incomplete fetching --- source/dnode/vnode/src/vnd/vnodeCos.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 0bb16fcd9c..9941c53750 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -775,6 +775,11 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, return TAOS_SYSTEM_ERROR(EIO); } + if (cbd.buf_pos != size) { + vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + return TAOS_SYSTEM_ERROR(EIO); + } + *ppBlock = cbd.buf; return 0; From c58ec72031866186a1fe495d6ca3e12f5d3b6c3a Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 09:27:43 +0800 Subject: [PATCH 11/28] config/s3blocksize: enable alter for debugging --- source/common/src/tglobal.c | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index cb67fc1ba3..ead9a5926b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -94,8 +94,8 @@ int32_t tsMonitorMaxLogs = 100; bool tsMonitorComp = false; // audit -bool tsEnableAudit = true; -bool tsEnableAuditCreateTable = true; +bool tsEnableAudit = true; +bool tsEnableAuditCreateTable = true; // telem #ifdef TD_ENTERPRISE @@ -222,7 +222,7 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR #ifdef WINDOWS bool tsStartUdfd = false; #else -bool tsStartUdfd = true; +bool tsStartUdfd = true; #endif // wal @@ -332,7 +332,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { return 0; } -struct SConfig *taosGetCfg() { return tsCfg; } +struct SConfig *taosGetCfg() { + return tsCfg; +} static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) { @@ -442,8 +444,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "enableScience", tsEnableScience, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; - if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != - 0) + if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, CFG_SCOPE_CLIENT, + CFG_DYN_CLIENT) != 0) return -1; if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; @@ -459,7 +461,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { // if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) // return -1; if (cfgAddInt32(pCfg, "maxShellConns", tsMaxShellConns, 10, 50000000, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "maxInsertBatchRows", tsMaxInsertBatchRows, 1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) + if (cfgAddInt32(pCfg, "maxInsertBatchRows", tsMaxInsertBatchRows, 1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != + 0) return -1; if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, CFG_SCOPE_BOTH, CFG_DYN_CLIENT) != 0) return -1; @@ -546,7 +549,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; + if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) + return -1; if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; @@ -685,7 +689,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) + if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != + 0) return -1; if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, @@ -728,7 +733,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) @@ -1669,6 +1674,7 @@ void taosCfgDynamicOptions(const char *option, const char *value) { {"ttlBatchDropNum", &tsTtlBatchDropNum}, {"ttlFlushThreshold", &tsTtlFlushThreshold}, {"ttlPushInterval", &tsTtlPushIntervalSec}, + {"s3BlockSize", &tsS3BlockSize}, {"s3BlockCacheSize", &tsS3BlockCacheSize}, {"s3PageCacheSize", &tsS3PageCacheSize}, {"s3UploadDelaySec", &tsS3UploadDelaySec}, @@ -1692,8 +1698,8 @@ void taosCfgDynamicOptions(const char *option, const char *value) { switch (pItem->dtype) { case CFG_DTYPE_BOOL: { - int32_t flag = atoi(value); - bool *pVar = options[d].optionVar; + int32_t flag = atoi(value); + bool *pVar = options[d].optionVar; uInfo("%s set from %d to %d", optName, *pVar, flag); *pVar = flag; } break; From 1bb10bb8625a9c30ad651ad9f8d1d15d3a792670 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 09:32:13 +0800 Subject: [PATCH 12/28] vnode/cos: check get object block size optionally --- source/dnode/vnode/src/inc/vndCos.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- source/dnode/vnode/src/vnd/vnodeCos.c | 15 +++++++++------ 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index 8581b039f8..0a055ed32a 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -38,7 +38,7 @@ void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock); +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index bf6f0cf4d6..249479d6bd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -3099,7 +3099,7 @@ static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) { } */ int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; - code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock); + code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock); if (code != TSDB_CODE_SUCCESS) { // taosMemoryFree(pBlock); // code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index def9a73d10..ba9507530c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -340,7 +340,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64 int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage); int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont; int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage; - code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, &pBlock); + code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, 1, &pBlock); if (code != TSDB_CODE_SUCCESS) { goto _exit; } diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 9941c53750..b7a13c1664 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -752,7 +752,7 @@ static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void * } } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { int status = 0; int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; @@ -775,7 +775,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, return TAOS_SYSTEM_ERROR(EIO); } - if (cbd.buf_pos != size) { + if (check && cbd.buf_pos != size) { vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); return TAOS_SYSTEM_ERROR(EIO); } @@ -1063,7 +1063,8 @@ bool s3Get(const char *object_name, const char *path) { return ret; } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t **ppBlock) { +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, bool check, uint8_t **ppBlock) { + (void)check; int32_t code = 0; cos_pool_t *p = NULL; int is_cname = 0; @@ -1255,8 +1256,10 @@ void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } bool s3Get(const char *object_name, const char *path) { return false; } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; } -void s3EvictCache(const char *path, long object_size) {} -long s3Size(const char *object_name) { return 0; } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { + return 0; +} +void s3EvictCache(const char *path, long object_size) {} +long s3Size(const char *object_name) { return 0; } #endif From 98a2ca6bfdb6688706839ad9419adc5cb8f37180 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 9 Nov 2023 10:07:24 +0800 Subject: [PATCH 13/28] change chkpid gen way --- source/dnode/mnode/impl/src/mndStream.c | 261 +++++++++++++----------- 1 file changed, 138 insertions(+), 123 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index fd0c349dd2..eaeed579e5 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -37,23 +37,23 @@ typedef struct SNodeEntry { int32_t nodeId; - bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot. - SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. - int64_t hbTimestamp; // second + bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot. + SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. + int64_t hbTimestamp; // second } SNodeEntry; typedef struct SStreamExecInfo { - SArray *pNodeEntryList; + SArray * pNodeEntryList; int64_t ts; // snapshot ts int64_t activeCheckpoint; // active check point id - SHashObj *pTaskMap; - SArray *pTaskList; + SHashObj * pTaskMap; + SArray * pTaskList; TdThreadMutex lock; } SStreamExecInfo; typedef struct SVgroupChangeInfo { SHashObj *pDBMap; - SArray *pUpdateNodeList; // SArray + SArray * pUpdateNodeList; // SArray } SVgroupChangeInfo; static int32_t mndNodeCheckSentinel = 0; @@ -78,7 +78,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static SArray *extractNodeListFromStream(SMnode *pMnode); -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady); +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); @@ -91,7 +91,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillActiveCheckpointTrans(SMnode *pMnode); -static int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList); +static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -193,9 +193,9 @@ STREAM_ENCODE_OVER: SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; - SSdbRow *pRow = NULL; + SSdbRow * pRow = NULL; SStreamObj *pStream = NULL; - void *buf = NULL; + void * buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) { @@ -272,7 +272,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream } SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName); if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; @@ -325,7 +325,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 return TSDB_CODE_SUCCESS; } - SNode *pAst = NULL; + SNode * pAst = NULL; int32_t code = nodesStringToNode(ast, &pAst); SQueryPlan *pPlan = NULL; @@ -350,7 +350,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 } static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { - SNode *pAst = NULL; + SNode * pAst = NULL; SQueryPlan *pPlan = NULL; mInfo("stream:%s to create", pCreate->name); @@ -589,7 +589,7 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { SStbObj *pStb = NULL; - SDbObj *pDb = NULL; + SDbObj * pDb = NULL; SMCreateStbReq createReq = {0}; tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); @@ -715,10 +715,12 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) } static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - int32_t code = -1; - SStreamObj *pStream = NULL; - SDbObj *pDb = NULL; + int32_t code = -1; + + SMnode * pMnode = pReq->info.node; + SStreamObj *pStream = NULL; + SDbObj * pDb = NULL; + SCMCreateStreamReq createStreamReq = {0}; SStreamObj streamObj = {0}; @@ -761,7 +763,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { int32_t numOfStream = 0; SStreamObj *pStream = NULL; - void *pIter = NULL; + void * pIter = NULL; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -858,12 +860,12 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SName name = {0}; tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - //reuse this function for stream + // reuse this function for stream - //TODO + // TODO if (createStreamReq.sql != NULL) { - auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, name.tname, - createStreamReq.sql, strlen(createStreamReq.sql)); + auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, name.tname, createStreamReq.sql, + strlen(createStreamReq.sql)); } _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { @@ -877,15 +879,31 @@ _OVER: return code; } +int64_t mndStreamGenChkpId(SMnode *pMnode) { + SStreamObj *pStream = NULL; + void * pIter = NULL; + SSdb * pSdb = pMnode->pSdb; + + int64_t maxChkpId = 0; + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) break; + + maxChkpId = MAX(maxChkpId, pStream->checkpointId); + sdbRelease(pSdb, pStream); + } + return maxChkpId + 1; +} + static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { return 0; } SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); - pMsg->checkpointId = taosGetTimestampMs(); + pMsg->checkpointId = mndStreamGenChkpId(pMnode); int32_t size = sizeof(SMStreamDoCheckpointMsg); SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; @@ -919,7 +937,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in return -1; } - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); tEncodeStreamCheckpointSourceReq(&encoder, &req); @@ -1042,7 +1060,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream int32_t totLevel = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < totLevel; i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); + SArray * pLevel = taosArrayGetP(pStream->tasks, i); SStreamTask *pTask = taosArrayGetP(pLevel, 0); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -1059,7 +1077,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream return -1; } - void *buf; + void * buf; int32_t tlen; if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId, pTask->id.taskId) < 0) { @@ -1070,7 +1088,8 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream STransAction action = {0}; SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); - initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); + initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, + TSDB_CODE_SYN_PROPOSE_NOT_READY); mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -1110,9 +1129,9 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream } static const char *mndGetStreamDB(SMnode *pMnode) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; - void *pIter = NULL; + void * pIter = NULL; pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { @@ -1126,9 +1145,9 @@ static const char *mndGetStreamDB(SMnode *pMnode) { } static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + SMnode * pMnode = pReq->info.node; + SSdb * pSdb = pMnode->pSdb; + void * pIter = NULL; SStreamObj *pStream = NULL; int32_t code = 0; @@ -1149,19 +1168,18 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { return 0; } - for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { + SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); if (pNodeEntry->stageUpdated) { mDebug("stream task not ready due to node update detected, checkpoint not issued"); return 0; } } - bool allReady = true; + bool allReady = true; SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); if (!allReady) { - mWarn("not all vnodes are ready, ignore the checkpoint") - taosArrayDestroy(pNodeSnapshot); + mWarn("not all vnodes are ready, ignore the checkpoint") taosArrayDestroy(pNodeSnapshot); return 0; } @@ -1182,15 +1200,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { - STaskId *p = taosArrayGet(execInfo.pTaskList, i); - STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); + STaskId * p = taosArrayGet(execInfo.pTaskList, i); + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); if (pEntry == NULL) { continue; } if (pEntry->status != TASK_STATUS__READY) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", - pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamTaskGetStatusStr(pEntry->status)); + pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamTaskGetStatusStr(pEntry->status)); ready = false; break; } @@ -1250,7 +1268,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode * pMnode = pReq->info.node; SStreamObj *pStream = NULL; SMDropStreamReq dropReq = {0}; @@ -1327,7 +1345,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - //reuse this function for stream + // reuse this function for stream auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen); @@ -1379,7 +1397,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { } int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -1387,7 +1405,7 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) } int32_t numOfStreams = 0; - void *pIter = NULL; + void * pIter = NULL; while (1) { SStreamObj *pStream = NULL; pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -1406,8 +1424,8 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) } static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SMnode * pMnode = pReq->info.node; + SSdb * pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStreamObj *pStream = NULL; @@ -1483,8 +1501,8 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { } static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SMnode * pMnode = pReq->info.node; + SSdb * pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStreamObj *pStream = NULL; @@ -1573,13 +1591,13 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // status char status[20 + VARSTR_HEADER_SIZE] = {0}; - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (pe == NULL) { continue; } - const char* pStatus = streamTaskGetStatusStr(pe->status); + const char *pStatus = streamTaskGetStatusStr(pe->status); STR_TO_VARSTR(status, pStatus); // status @@ -1591,24 +1609,24 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false); // input queue - char vbuf[30] = {0}; - char buf[25] = {0}; - const char* queueInfoStr = "%4.2fMiB (%5.2f%)"; + char vbuf[30] = {0}; + char buf[25] = {0}; + const char *queueInfoStr = "%4.2fMiB (%5.2f%)"; sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate); STR_TO_VARSTR(vbuf, buf); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); // output queue -// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); -// STR_TO_VARSTR(vbuf, buf); + // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); + // STR_TO_VARSTR(vbuf, buf); -// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - const char* sinkStr = "%.2fMiB"; + const char *sinkStr = "%.2fMiB"; sprintf(buf, sinkStr, pe->sinkDataSize); } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info @@ -1619,7 +1637,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock STR_TO_VARSTR(vbuf, buf); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); numOfRows++; } @@ -1663,7 +1681,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { } int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { - SArray* tasks = pStream->tasks; + SArray *tasks = pStream->tasks; int32_t size = taosArrayGetSize(tasks); for (int32_t i = 0; i < size; i++) { @@ -1700,7 +1718,7 @@ static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, in } static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode * pMnode = pReq->info.node; SStreamObj *pStream = NULL; SMPauseStreamReq pauseReq = {0}; @@ -1816,7 +1834,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn } static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode * pMnode = pReq->info.node; SStreamObj *pStream = NULL; SMResumeStreamReq pauseReq = {0}; @@ -1901,7 +1919,7 @@ static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChang } static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, - SStreamTaskId* pId, int32_t transId) { + SStreamTaskId *pId, int32_t transId) { SStreamTaskNodeUpdateMsg req = {0}; initNodeUpdateMsg(&req, pInfo, pId, transId); @@ -1924,7 +1942,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha return -1; } - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); tEncodeStreamTaskUpdateMsg(&encoder, &req); @@ -1991,7 +2009,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p for (int32_t k = 0; k < numOfTasks; ++k) { SStreamTask *pTask = taosArrayGetP(pLevel, k); - void *pBuf = NULL; + void * pBuf = NULL; int32_t len = 0; streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); @@ -2012,7 +2030,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { const SEp *pEp = GET_ACTIVE_EP(pPrevEpset); - const SEp* p = GET_ACTIVE_EP(pCurrent); + const SEp *p = GET_ACTIVE_EP(pCurrent); if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { return false; @@ -2066,9 +2084,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP return info; } -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { + SSdb * pSdb = pMnode->pSdb; + void * pIter = NULL; SVgObj *pVgroup = NULL; *allReady = true; @@ -2115,8 +2133,8 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange // check all streams that involved this vnode should update the epset info SStreamObj *pStream = NULL; - void *pIter = NULL; - STrans *pTrans = NULL; + void * pIter = NULL; + STrans * pTrans = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -2177,9 +2195,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } static SArray *extractNodeListFromStream(SMnode *pMnode) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; - void *pIter = NULL; + void * pIter = NULL; SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); while (1) { @@ -2226,9 +2244,9 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { } static void doExtractTasksFromStream(SMnode *pMnode) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; - void *pIter = NULL; + void * pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -2263,11 +2281,11 @@ static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { return TSDB_CODE_SUCCESS; } -static bool taskNodeExists(SArray* pList, int32_t nodeId) { +static bool taskNodeExists(SArray *pList, int32_t nodeId) { size_t num = taosArrayGetSize(pList); - for(int32_t i = 0; i < num; ++i) { - SNodeEntry* pEntry = taosArrayGet(pList, i); + for (int32_t i = 0; i < num; ++i) { + SNodeEntry *pEntry = taosArrayGet(pList, i); if (pEntry->nodeId == nodeId) { return true; } @@ -2277,12 +2295,12 @@ static bool taskNodeExists(SArray* pList, int32_t nodeId) { } int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { - SArray* pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); + SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); - for(int32_t i = 0; i < numOfTask; ++i) { - STaskId* pId = taosArrayGet(execInfo.pTaskList, i); - STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + for (int32_t i = 0; i < numOfTask; ++i) { + STaskId * pId = taosArrayGet(execInfo.pTaskList, i); + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); if (!existed) { @@ -2290,21 +2308,21 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { } } - for(int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) { - STaskId* pId = taosArrayGet(pRemovedTasks, i); + for (int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) { + STaskId *pId = taosArrayGet(pRemovedTasks, i); doRemoveTasks(&execInfo, pId); } mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), - (int32_t) taosArrayGetSize(execInfo.pTaskList)); + (int32_t)taosArrayGetSize(execInfo.pTaskList)); int32_t size = taosArrayGetSize(pNodeSnapshot); - SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry)); - for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { - SNodeEntry* p = taosArrayGet(execInfo.pNodeEntryList, i); + SArray *pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry)); + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { + SNodeEntry *p = taosArrayGet(execInfo.pNodeEntryList, i); - for(int32_t j = 0; j < size; ++j) { - SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j); + for (int32_t j = 0; j < size; ++j) { + SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j); if (pEntry->nodeId == p->nodeId) { taosArrayPush(pValidNodeEntryList, p); break; @@ -2315,7 +2333,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); execInfo.pNodeEntryList = pValidNodeEntryList; - mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList)); + mDebug("remain %d valid node entries", (int32_t)taosArrayGetSize(pValidNodeEntryList)); taosArrayDestroy(pRemovedTasks); return 0; } @@ -2347,7 +2365,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } - bool allVnodeReady = true; + bool allVnodeReady = true; SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady); if (!allVnodeReady) { taosArrayDestroy(pNodeSnapshot); @@ -2361,7 +2379,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { - // kill current active checkpoint transaction, since the transaction is vnode wide. doKillActiveCheckpointTrans(pMnode); code = mndProcessVgroupChange(pMnode, &changeInfo); @@ -2396,7 +2413,7 @@ typedef struct SMStreamNodeCheckMsg { static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; if (sdbGetSize(pSdb, SDB_STREAM) <= 0) { return 0; } @@ -2420,7 +2437,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { SStreamTask *pTask = taosArrayGetP(pLevel, j); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); + void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p == NULL) { STaskStatusEntry entry = {0}; streamTaskStatusInit(&entry, pTask); @@ -2434,7 +2451,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } } -void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) { +void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -2444,12 +2461,12 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) { SStreamTask *pTask = taosArrayGetP(pLevel, j); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); + void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p != NULL) { taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); - for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { - STaskId* pId = taosArrayGet(pExecNode->pTaskList, k); + for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { + STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); if (pId->taskId == id.taskId && pId->streamId == id.streamId) { taosArrayRemove(pExecNode->pTaskList, k); mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, @@ -2457,7 +2474,6 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) { break; } } - } } } @@ -2487,7 +2503,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) { return pTrans; } -int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) { +int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { STrans *pTrans = doCreateTrans(pMnode, pStream, "stream-task-reset"); if (pTrans == NULL) { return terrno; @@ -2504,7 +2520,7 @@ int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) { SStreamTask *pTask = taosArrayGetP(pLevel, k); // todo extract method, with pause stream task - SVResetStreamTaskReq* pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq)); + SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq), @@ -2550,9 +2566,9 @@ int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) { int32_t doKillActiveCheckpointTrans(SMnode *pMnode) { int32_t transId = 0; - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; STrans *pTrans = NULL; - void *pIter = NULL; + void * pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans); @@ -2583,13 +2599,13 @@ int32_t doKillActiveCheckpointTrans(SMnode *pMnode) { return TSDB_CODE_SUCCESS; } -int32_t mndResetFromCheckpoint(SMnode* pMnode) { +int32_t mndResetFromCheckpoint(SMnode *pMnode) { doKillActiveCheckpointTrans(pMnode); // set all tasks status to be normal, refactor later to be stream level, instead of vnode level. - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; - void *pIter = NULL; + void * pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { @@ -2608,15 +2624,15 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) { return 0; } -int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { +int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { int32_t num = taosArrayGetSize(pNodeList); for (int k = 0; k < num; ++k) { - int32_t* pVgId = taosArrayGet(pNodeList, k); + int32_t *pVgId = taosArrayGet(pNodeList, k); int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); for (int i = 0; i < numOfNodes; ++i) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); + SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); if (pNodeEntry->nodeId == *pVgId) { mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId); @@ -2629,12 +2645,11 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { return TSDB_CODE_SUCCESS; } -static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) { +static void updateStageInfo(STaskStatusEntry *pTaskEntry, int32_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); - for(int32_t j = 0; j < numOfNodes; ++j) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); + for (int32_t j = 0; j < numOfNodes; ++j) { + SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); if (pNodeEntry->nodeId == pTaskEntry->nodeId) { - mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId); @@ -2646,7 +2661,7 @@ static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) { } int32_t mndProcessStreamHb(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode * pMnode = pReq->info.node; SStreamHbMsg req = {0}; bool checkpointFailed = false; @@ -2699,15 +2714,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { pTaskEntry->status = p->status; if (p->status != TASK_STATUS__READY) { - mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); + mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); } } // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal if (checkpointFailed && activeCheckpointId != 0) { - bool allReady = true; - SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady); + bool allReady = true; + SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); taosArrayDestroy(p); if (allReady) { From 5ac66679db18c91006eda9741a3f73d31b5d4cf1 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 10:17:46 +0800 Subject: [PATCH 14/28] config/s3blocksize: move range check from global to mnode --- source/common/src/tglobal.c | 2 +- source/dnode/mnode/impl/src/mndDnode.c | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ead9a5926b..8bb2fa3ab7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1674,7 +1674,7 @@ void taosCfgDynamicOptions(const char *option, const char *value) { {"ttlBatchDropNum", &tsTtlBatchDropNum}, {"ttlFlushThreshold", &tsTtlFlushThreshold}, {"ttlPushInterval", &tsTtlPushIntervalSec}, - {"s3BlockSize", &tsS3BlockSize}, + //{"s3BlockSize", &tsS3BlockSize}, {"s3BlockCacheSize", &tsS3BlockCacheSize}, {"s3PageCacheSize", &tsS3PageCacheSize}, {"s3UploadDelaySec", &tsS3UploadDelaySec}, diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 85e4ef0fc2..f4108b52c6 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -1310,6 +1310,22 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { } tFreeSMCfgDnodeReq(&cfgReq); return 0; + } else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) { + int32_t optLen = strlen("s3blocksize"); + int32_t flag = -1; + int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); + if (code < 0) return code; + + if (flag > 1024 * 1024 || (flag > -1 && flag < 4) || flag < -1) { + mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: -1 or [4, 1024 * 1024]", + cfgReq.dnodeId, flag); + terrno = TSDB_CODE_INVALID_CFG; + tFreeSMCfgDnodeReq(&cfgReq); + return -1; + } + + strcpy(dcfgReq.config, "s3blocksize"); + snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); #endif } else { mndMCfg2DCfg(&cfgReq, &dcfgReq); From 50666987f00526b2815896a4b9b7349c589890e4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 9 Nov 2023 11:23:17 +0800 Subject: [PATCH 15/28] change chkpid gen way --- source/libs/stream/src/streamSnapshot.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 3de5de9967..5f72129ebd 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -19,7 +19,6 @@ #include "streamBackendRocksdb.h" #include "streamInt.h" #include "tcommon.h" -#include "streamInt.h" enum SBackendFileType { ROCKSDB_OPTIONS_TYPE = 1, @@ -193,8 +192,8 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk taosArrayPush(pFile->pSst, &sst); } } - { - char* buf = taosMemoryCalloc(1, 512); + if (qDebugFlag & DEBUG_TRACE) { + char* buf = taosMemoryCalloc(1, 128 + taosArrayGetSize(pFile->pSst) * 16); sprintf(buf, "[current: %s,", pFile->pCurrent); sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest); sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions); @@ -344,10 +343,10 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); - if(buf == NULL){ + if (buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); + int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { taosMemoryFree(buf); code = TAOS_SYSTEM_ERROR(terrno); @@ -480,8 +479,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa } int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { SStreamSnapHandle* handle = &pWriter->handle; - if (qDebugFlag & DEBUG_DEBUG) { - char* buf = (char*)taosMemoryMalloc(1024); + if (qDebugFlag & DEBUG_TRACE) { + char* buf = (char*)taosMemoryMalloc(128 + taosArrayGetSize(handle->pFileList) * 16); int n = sprintf(buf, "["); for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) { SBackendFileItem* item = taosArrayGet(handle->pFileList, i); From b80770dea8a83ffdc0d0c89ac12e5dcf53d3eea4 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 9 Nov 2023 11:46:45 +0800 Subject: [PATCH 16/28] fix: close vnode in the failed mode properly in vmCloseVnode --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 7 ++++++- source/dnode/vnode/src/tsdb/tsdbOpen.c | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index d2093ff77c..7a2bd0f847 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -144,6 +144,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) char path[TSDB_FILENAME_LEN] = {0}; bool atExit = true; + if (pVnode->failed) { + ASSERT(pVnode->pImpl == NULL); + goto _closed; + } if (vnodeIsLeader(pVnode->pImpl)) { vnodeProposeCommitOnNeed(pVnode->pImpl, atExit); } @@ -202,6 +206,8 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) vnodeClose(pVnode->pImpl); pVnode->pImpl = NULL; + +_closed: dInfo("vgId:%d, vnode is closed", pVnode->vgId); if (commitAndRemoveWal) { @@ -386,7 +392,6 @@ static void *vmCloseVnodeInThread(void *param) { for (int32_t v = 0; v < pThread->vnodeNum; ++v) { SVnodeObj *pVnode = pThread->ppVnodes[v]; - if (pVnode->failed) continue; char stepDesc[TSDB_STEP_DESC_LEN] = {0}; snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId, diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index c32b2eedd7..a1f864814f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -89,6 +89,8 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee return 0; _err: + tsdbCloseFS(&pTsdb->pFS); + taosThreadMutexDestroy(&pTsdb->mutex); taosMemoryFree(pTsdb); return -1; } From 15b73354f8fb1dd5578d3172e76f1fd2e7d7e8c3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 9 Nov 2023 13:59:21 +0800 Subject: [PATCH 17/28] change chkpid gen way --- source/dnode/mnode/impl/src/mndStream.c | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index eaeed579e5..135aab285b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -715,12 +715,10 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) } static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { - int32_t code = -1; - - SMnode * pMnode = pReq->info.node; - SStreamObj *pStream = NULL; - SDbObj * pDb = NULL; - + SMnode * pMnode = pReq->info.node; + int32_t code = -1; + SStreamObj * pStream = NULL; + SDbObj * pDb = NULL; SCMCreateStreamReq createStreamReq = {0}; SStreamObj streamObj = {0}; @@ -883,13 +881,12 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) { SStreamObj *pStream = NULL; void * pIter = NULL; SSdb * pSdb = pMnode->pSdb; - - int64_t maxChkpId = 0; + int64_t maxChkpId = 0; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - maxChkpId = MAX(maxChkpId, pStream->checkpointId); + maxChkpId = TMAX(maxChkpId, pStream->checkpointId); sdbRelease(pSdb, pStream); } return maxChkpId + 1; From 8b6fc10bbd6313d40fe94bb443135197b16c01ab Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 13:59:39 +0800 Subject: [PATCH 18/28] config/block-size: make range > 0 --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8bb2fa3ab7..eb0059676e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -733,7 +733,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) + if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 1024, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) From 17bddf5ff49128cb30fd0737708990c722444bad Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 14:09:58 +0800 Subject: [PATCH 19/28] cos: use uError instead of vError --- source/common/src/cos.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 7d8175637b..0b6b0db885 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -794,7 +794,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, } if (check && cbd.buf_pos != size) { - vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); return TAOS_SYSTEM_ERROR(EIO); } From e88fb845088bdfcfebd4cab04f7251f9f9c51742 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 14:15:01 +0800 Subject: [PATCH 20/28] config/block-size fix --- source/common/src/tglobal.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index eab4c2ea77..b66d811d10 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -128,12 +128,12 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; // int32_t tsSmlBatchSize = 10000; // checkpoint backup -char tsSnodeAddress[TSDB_FQDN_LEN] = {0}; +char tsSnodeAddress[TSDB_FQDN_LEN] = {0}; int32_t tsRsyncPort = 873; #ifdef WINDOWS char tsCheckpointBackupDir[PATH_MAX] = "C:\\TDengine\\data\\backup\\checkpoint\\"; #else -char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; +char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; #endif // tmq @@ -335,7 +335,7 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { } if (tsS3BucketName[0] != '<') { #if defined(USE_COS) || defined(USE_S3) - if(tsDiskCfgNum > 1) tsS3Enabled = true; + if (tsDiskCfgNum > 1) tsS3Enabled = true; tsS3StreamEnabled = true; #endif } @@ -678,7 +678,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "rsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH, CFG_DYN_SERVER) != 0) return -1; if (cfgAddString(pCfg, "snodeAddress", tsSnodeAddress, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; - if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) + return -1; if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -748,7 +749,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 1024, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) + if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -1, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) From e60f69a7ed314800df0805a42f4c88106c79a4e1 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 9 Nov 2023 14:20:27 +0800 Subject: [PATCH 21/28] fix: code typo --- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 46e3aff0d4..22fb3b84ec 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -192,7 +192,7 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { committer->ctx->tbid->uid = record->uid; if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) { - code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid); + code = tsdbIterMergerSkipTableData(committer->tombIterMerger, committer->ctx->tbid); TSDB_CHECK_CODE(code, lino, _exit); continue; } From c1f935bd124956fd83cf6e5e1772c367330296bf Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 9 Nov 2023 14:49:04 +0800 Subject: [PATCH 22/28] fix: const value replace issue --- source/libs/parser/src/parCalcConst.c | 11 +++++++++-- tests/parallel_test/cases.task | 1 + tests/script/tsim/query/const.sim | 11 +++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) create mode 100644 tests/script/tsim/query/const.sim diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index 0657f1a43d..441f4da3b1 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -176,12 +176,15 @@ static int32_t calcConstStmtCondition(SCalcConstContext* pCxt, SNode** pCond, bo static EDealRes doFindAndReplaceNode(SNode** pNode, void* pContext) { SCalcConstContext* pCxt = pContext; if (pCxt->replaceCxt.pTarget == *pNode) { + char aliasName[TSDB_COL_NAME_LEN] = {0}; + strcpy(aliasName, ((SExprNode*)*pNode)->aliasName); nodesDestroyNode(*pNode); *pNode = nodesCloneNode(pCxt->replaceCxt.pNew); if (NULL == *pNode) { pCxt->code = TSDB_CODE_OUT_OF_MEMORY; return DEAL_RES_ERROR; } + strcpy(((SExprNode*)*pNode)->aliasName, aliasName); pCxt->replaceCxt.replaced = true; return DEAL_RES_END; @@ -211,7 +214,6 @@ static int32_t calcConstProject(SCalcConstContext* pCxt, SNode* pProject, bool d } char aliasName[TSDB_COL_NAME_LEN] = {0}; - strcpy(aliasName, ((SExprNode*)pProject)->aliasName); int32_t code = TSDB_CODE_SUCCESS; if (dual) { code = scalarCalculateConstantsFromDual(pProject, pNew); @@ -219,15 +221,20 @@ static int32_t calcConstProject(SCalcConstContext* pCxt, SNode* pProject, bool d code = scalarCalculateConstants(pProject, pNew); } if (TSDB_CODE_SUCCESS == code) { - strcpy(((SExprNode*)*pNew)->aliasName, aliasName); if (QUERY_NODE_VALUE == nodeType(*pNew) && NULL != pAssociation) { int32_t size = taosArrayGetSize(pAssociation); for (int32_t i = 0; i < size; ++i) { SAssociationNode* pAssNode = taosArrayGet(pAssociation, i); SNode** pCol = pAssNode->pPlace; if (*pCol == pAssNode->pAssociationNode) { + strcpy(aliasName, ((SExprNode*)*pCol)->aliasName); + SArray* pOrigAss = NULL; + TSWAP(((SExprNode*)*pCol)->pAssociation, pOrigAss); nodesDestroyNode(*pCol); *pCol = nodesCloneNode(*pNew); + TSWAP(pOrigAss, ((SExprNode*)*pCol)->pAssociation); + taosArrayDestroy(pOrigAss); + strcpy(((SExprNode*)*pCol)->aliasName, aliasName); if (NULL == *pCol) { code = TSDB_CODE_OUT_OF_MEMORY; break; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 28aa8744fd..9fd3625b4c 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1294,6 +1294,7 @@ e ,,y,script,./test.sh -f tsim/tagindex/add_index.sim ,,n,script,./test.sh -f tsim/tagindex/sma_and_tag_index.sim ,,y,script,./test.sh -f tsim/view/view.sim +,,y,script,./test.sh -f tsim/query/const.sim #develop test diff --git a/tests/script/tsim/query/const.sim b/tests/script/tsim/query/const.sim new file mode 100644 index 0000000000..08f2b909c1 --- /dev/null +++ b/tests/script/tsim/query/const.sim @@ -0,0 +1,11 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +sql select b.z from (select c.a as z from (select 'a' as a) c) b; +if $rows != 1 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From 386c25a99a71092b8c7dd641e5ba2286415408d9 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 15:24:19 +0800 Subject: [PATCH 23/28] tsdb/retention: remove file when ref's clear --- source/dnode/vnode/src/tsdb/tsdbFile2.c | 13 ++++++++++++- source/dnode/vnode/src/tsdb/tsdbRetention.c | 12 ++++++------ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index 9edb03d35b..bf3357dabb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -14,6 +14,7 @@ */ #include "tsdbFile2.h" +#include "cos.h" // to_json static int32_t head_to_json(const STFile *file, cJSON *json); @@ -44,7 +45,17 @@ static const struct { void remove_file(const char *fname) { int32_t code = taosRemoveFile(fname); if (code) { - tsdbError("file:%s remove failed", fname); + if (tsS3Enabled) { + const char *object_name = taosDirEntryBaseName((char *)fname); + long s3_size = tsS3Enabled ? s3Size(object_name) : 0; + if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) { + s3DeleteObjects(&object_name, 1); + } else { + tsdbError("file:%s remove failed", fname); + } + } else { + tsdbError("file:%s remove failed", fname); + } } else { tsdbInfo("file:%s is removed", fname); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 1908f16529..0a41ac3cc8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" #include "tsdbFS2.h" -#include "cos.h" #include "vnd.h" typedef struct { @@ -292,15 +292,15 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) { if (expLevel < 0) { // remove the fileset for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) { if (fobj == NULL) continue; - + /* int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) { code = tsdbRemoveFileObjectS3(rtner, fobj); TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbDoRemoveFileObject(rtner, fobj); - TSDB_CHECK_CODE(code, lino, _exit); - } + } else {*/ + code = tsdbDoRemoveFileObject(rtner, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + //} } SSttLvl *lvl; From a6600ab23a1e3f794deab6a84da6ff31cf5c0052 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Thu, 9 Nov 2023 14:23:42 +0800 Subject: [PATCH 24/28] correct colid in blockinfo --- include/libs/nodes/plannodes.h | 1 + source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 5 +-- source/libs/executor/src/cachescanoperator.c | 17 ++++++++- source/libs/nodes/src/nodesCodeFuncs.c | 7 ++++ source/libs/nodes/src/nodesMsgFuncs.c | 9 ++++- source/libs/nodes/src/nodesUtilFuncs.c | 1 + source/libs/planner/src/planPhysiCreater.c | 1 + tests/system-test/2-query/last_cache_scan.py | 36 ++++++++++++++++++++ 8 files changed, 73 insertions(+), 4 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index dbbe1d92dc..4ffcb616dd 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -388,6 +388,7 @@ typedef struct SLastRowScanPhysiNode { SNodeList* pGroupTags; bool groupSort; bool ignoreNull; + SNodeList* pTargets; } SLastRowScanPhysiNode; typedef SLastRowScanPhysiNode STableCountScanPhysiNode; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 2909b550d7..b6aa791cf0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -30,10 +30,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { uint64_t ts = 0; SFirstLastRes* p; + col_id_t colId; for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); int32_t slotId = slotIds[i]; SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); + colId = pColVal->colVal.cid; p = (SFirstLastRes*)varDataVal(pRes[i]); p->ts = pColVal->ts; @@ -63,8 +65,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) { colDataSetVal(pCol, numOfRows, (const char*)&ts, false); continue; - } - if (pReader->numOfCols == 1 && dstSlotIds[0] != idx) { + } else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && pCol->info.colId == colId) { if (!p->isNull) { colDataSetVal(pCol, numOfRows, p->buf, false); } else { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index a7b4fe02f6..6d59698855 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -54,6 +54,19 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM #define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) +static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SNodeList* pTargets) { + SNode* pNode; + int32_t idx = 0; + FOREACH(pNode, pTargets) { + if (nodeType(pNode) == QUERY_NODE_COLUMN) { + SColumnNode* pCol = (SColumnNode*)pNode; + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx); + pColInfo->info.colId = pCol->colId; + } + idx++; + } +} + SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -114,10 +127,12 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe capacity = TMIN(totalTables, 4096); pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); + setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode->pTargets); blockDataEnsureCapacity(pInfo->pBufferredRes, capacity); } else { // by tags pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull); capacity = 1; // only one row output + setColIdForCacheReadBlock(pInfo->pRes, pScanNode->pTargets); } initResultSizeInfo(&pOperator->resultInfo, capacity); @@ -192,7 +207,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) { - SColumnInfoData* pCol = taosArrayGet(pInfo->pBufferredRes->pDataBlock, i); + SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i); int32_t slotId = pCol->info.slotId; SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c97c920a3b..c9b49ee30f 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1773,6 +1773,7 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) { static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags"; static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort"; +static const char* jkLastRowScanPhysiPlanTargets = "Targets"; static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) { const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj; @@ -1784,6 +1785,9 @@ static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkLastRowScanPhysiPlanGroupSort, pNode->groupSort); } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkLastRowScanPhysiPlanTargets, pNode->pTargets); + } return code; } @@ -1798,6 +1802,9 @@ static int32_t jsonToPhysiLastRowScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkLastRowScanPhysiPlanGroupSort, &pNode->groupSort); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkLastRowScanPhysiPlanTargets, &pNode->pTargets); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 99100b2a1d..ea59d93d7f 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2052,7 +2052,8 @@ enum { PHY_LAST_ROW_SCAN_CODE_SCAN = 1, PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT, - PHY_LAST_ROW_SCAN_CODE_IGNULL + PHY_LAST_ROW_SCAN_CODE_IGNULL, + PHY_LAST_ROW_SCAN_CODE_TARGETS }; static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2068,6 +2069,9 @@ static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_IGNULL, pNode->ignoreNull); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_LAST_ROW_SCAN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); + } return code; } @@ -2091,6 +2095,9 @@ static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) { case PHY_LAST_ROW_SCAN_CODE_IGNULL: code = tlvDecodeBool(pTlv, &pNode->ignoreNull); break; + case PHY_LAST_ROW_SCAN_CODE_TARGETS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 4f6d3d95e1..ee22caf574 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1285,6 +1285,7 @@ void nodesDestroyNode(SNode* pNode) { SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode; destroyScanPhysiNode((SScanPhysiNode*)pNode); nodesDestroyList(pPhyNode->pGroupTags); + nodesDestroyList(pPhyNode->pTargets); break; } case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index d6799a25a7..5cf3426e6f 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -552,6 +552,7 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } + pScan->pTargets = nodesCloneList(pScanLogicNode->node.pTargets); pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags); if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) { diff --git a/tests/system-test/2-query/last_cache_scan.py b/tests/system-test/2-query/last_cache_scan.py index e75729f960..0f0936ebab 100644 --- a/tests/system-test/2-query/last_cache_scan.py +++ b/tests/system-test/2-query/last_cache_scan.py @@ -283,6 +283,42 @@ class TDTestCase: tdSql.checkData(0, 3, 1001) tdSql.checkData(0, 4, "2018-11-25 19:30:00.000") + sql_template = 'select %s from meters partition by tbname' + select_items = ["ts, last(c10), c10, ts", "ts, ts, last(c10), c10, tbname", "last(c10), c10, ts"] + has_last_row_scan_res = [1,1,1] + sqls = self.format_sqls(sql_template, select_items) + self.explain_and_check_res(sqls, has_last_row_scan_res) + tdSql.query(sqls[0], queryTimes=1) + tdSql.checkRows(10) + tdSql.checkData(0,0, '2018-11-25 19:30:00.000') + tdSql.checkData(0,1, '2018-11-25 19:30:01.000') + tdSql.checkData(0,2, '2018-11-25 19:30:01.000') + tdSql.checkData(0,3, '2018-11-25 19:30:00.000') + + tdSql.query(sqls[1], queryTimes=1) + tdSql.checkRows(10) + tdSql.checkData(0,0, '2018-11-25 19:30:00.000') + tdSql.checkData(0,1, '2018-11-25 19:30:00.000') + tdSql.checkData(0,2, '2018-11-25 19:30:01.000') + tdSql.checkData(0,3, '2018-11-25 19:30:01.000') + + sql_template = 'select %s from meters partition by t1' + select_items = ["ts, last(c10), c10, ts", "ts, ts, last(c10), c10, t1", "last(c10), c10, ts"] + has_last_row_scan_res = [1,1,1] + sqls = self.format_sqls(sql_template, select_items) + self.explain_and_check_res(sqls, has_last_row_scan_res) + tdSql.query(sqls[0], queryTimes=1) + tdSql.checkRows(5) + tdSql.checkData(0,0, '2018-11-25 19:30:00.000') + tdSql.checkData(0,1, '2018-11-25 19:30:01.000') + tdSql.checkData(0,2, '2018-11-25 19:30:01.000') + tdSql.checkData(0,3, '2018-11-25 19:30:00.000') + + tdSql.query("select ts, last(c10), t1, t2 from meters partition by t1, t2") + tdSql.checkRows(10) + tdSql.checkData(0, 0, '2018-11-25 19:30:00.000') + tdSql.checkData(0, 1, '2018-11-25 19:30:01.000') + def run(self): self.prepareTestEnv() #time.sleep(99999999) From 73f1c55a47024bd634db9f3a2d698f1b09d586c9 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 9 Nov 2023 17:23:06 +0800 Subject: [PATCH 25/28] fix: remove vnode obj from hash --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 7a2bd0f847..21b791eb4d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -144,11 +144,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) char path[TSDB_FILENAME_LEN] = {0}; bool atExit = true; - if (pVnode->failed) { - ASSERT(pVnode->pImpl == NULL); - goto _closed; - } - if (vnodeIsLeader(pVnode->pImpl)) { + if (pVnode->pImpl && vnodeIsLeader(pVnode->pImpl)) { vnodeProposeCommitOnNeed(pVnode->pImpl, atExit); } @@ -157,6 +153,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) taosThreadRwlockUnlock(&pMgmt->lock); vmReleaseVnode(pMgmt, pVnode); + if (pVnode->failed) { + ASSERT(pVnode->pImpl == NULL); + goto _closed; + } dInfo("vgId:%d, pre close", pVnode->vgId); vnodePreClose(pVnode->pImpl); From d3cf6a4340b7f919c2a99f15cd9949f3655811cf Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 17:44:24 +0800 Subject: [PATCH 26/28] tsdb/file: new nlevel field for remove --- source/dnode/vnode/src/tsdb/tsdbFS2.c | 7 ++++--- source/dnode/vnode/src/tsdb/tsdbFile2.c | 9 +++++---- source/dnode/vnode/src/tsdb/tsdbFile2.h | 1 + 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 348397272d..02ef75ae86 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -22,7 +22,7 @@ extern int vnodeScheduleTask(int (*execute)(void *), void *arg); extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); -extern void remove_file(const char *fname); +extern void remove_file(const char *fname, bool last_level); #define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT #define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1) @@ -532,7 +532,8 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) { if (taosIsDir(file->aname)) continue; if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) { - remove_file(file->aname); + int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs); + remove_file(file->aname, nlevel > 1 && file->did.level == nlevel - 1); } } @@ -1282,4 +1283,4 @@ int32_t tsdbFSEnableBgTask(STFileSystem *fs) { fs->stop = false; taosThreadMutexUnlock(&fs->tsdb->mutex); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index bf3357dabb..cc05b8ee18 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -42,10 +42,10 @@ static const struct { [TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json}, }; -void remove_file(const char *fname) { +void remove_file(const char *fname, bool last_level) { int32_t code = taosRemoveFile(fname); if (code) { - if (tsS3Enabled) { + if (tsS3Enabled && last_level) { const char *object_name = taosDirEntryBaseName((char *)fname); long s3_size = tsS3Enabled ? s3Size(object_name) : 0; if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) { @@ -235,6 +235,7 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) { fobj[0]->state = TSDB_FSTATE_LIVE; fobj[0]->ref = 1; tsdbTFileName(pTsdb, f, fobj[0]->fname); + fobj[0]->nlevel = tfsGetLevel(pTsdb->pVnode->pTfs); return 0; } @@ -256,7 +257,7 @@ int32_t tsdbTFileObjUnref(STFileObj *fobj) { tsdbTrace("unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef); if (nRef == 0) { if (fobj->state == TSDB_FSTATE_DEAD) { - remove_file(fobj->fname); + remove_file(fobj->fname, fobj->nlevel > 1 && fobj->f->did.level == fobj->nlevel - 1); } taosMemoryFree(fobj); } @@ -272,7 +273,7 @@ int32_t tsdbTFileObjRemove(STFileObj *fobj) { taosThreadMutexUnlock(&fobj->mutex); tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef); if (nRef == 0) { - remove_file(fobj->fname); + remove_file(fobj->fname, fobj->nlevel > 1 && fobj->f->did.level == fobj->nlevel - 1); taosMemoryFree(fobj); } return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.h b/source/dnode/vnode/src/tsdb/tsdbFile2.h index 9da198c1f0..b94f7a9fd0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.h @@ -76,6 +76,7 @@ struct STFileObj { STFile f[1]; int32_t state; int32_t ref; + int32_t nlevel; char fname[TSDB_FILENAME_LEN]; }; From 2fbf0a532dd71428e386433ed4d512bee19c81e2 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 9 Nov 2023 09:51:35 +0000 Subject: [PATCH 27/28] fix/TD-27243 --- source/dnode/vnode/src/vnd/vnodeSvr.c | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c6c93e3d3f..eadfd39d0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -928,6 +928,17 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, goto _exit; } + if(tsEnableAuditCreateTable){ + char* str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); + if (str == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + rcode = -1; + goto _exit; + } + strcpy(str, pCreateReq->name); + taosArrayPush(tbNames, &str); + } + // validate hash sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name); if (vnodeValidateTableHash(pVnode, tbName) < 0) { @@ -951,12 +962,6 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, } taosArrayPush(rsp.pArray, &cRsp); - - if (tsEnableAuditCreateTable) { - char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); - strcpy(str, pCreateReq->name); - taosArrayPush(tbNames, &str); - } } vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids)); @@ -985,10 +990,10 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB); SStringBuilder sb = {0}; - for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { - char **key = (char **)taosArrayGet(tbNames, iReq); + for(int32_t i = 0; i < tbNames->size; i++){ + char** key = (char**)taosArrayGet(tbNames, i); taosStringBuilderAppendStringLen(&sb, *key, strlen(*key)); - if (iReq < req.nReqs - 1) { + if(i < tbNames->size - 1){ taosStringBuilderAppendChar(&sb, ','); } taosMemoryFreeClear(*key); From 32528ab9cd003a3dadd5e28abaf3ce4a94f91f29 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 10 Nov 2023 12:44:09 +0800 Subject: [PATCH 28/28] fix(tsdb/reader-writer): fix pgnoEnd calc --- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index adb72821e4..8b9cae42fc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -338,7 +338,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64 // 2, retrieve pgs from s3 uint8_t *pBlock = NULL; int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage); - int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont; + int64_t pgnoEnd = pgno - 1 + (bOffset + size - n + szPgCont - 1) / szPgCont; int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage; code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, 1, &pBlock); if (code != TSDB_CODE_SUCCESS) {