From 765111daf368195b40dc7f6f44a52f1fbde28fb3 Mon Sep 17 00:00:00 2001 From: "chao.feng" Date: Mon, 5 Jun 2023 14:56:19 +0800 Subject: [PATCH] update the ci test cases for user privilege by charles --- .../0-others/user_privilege_show.py | 267 ++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100644 tests/system-test/0-others/user_privilege_show.py diff --git a/tests/system-test/0-others/user_privilege_show.py b/tests/system-test/0-others/user_privilege_show.py new file mode 100644 index 0000000000..9f49778ba8 --- /dev/null +++ b/tests/system-test/0-others/user_privilege_show.py @@ -0,0 +1,267 @@ +from itertools import product +import taos +from taos.tmq import * +from util.cases import * +from util.common import * +from util.log import * +from util.sql import * +from util.sqlset import * + + +class TDTestCase: + """This test case is used to veirfy the show create stable/table command for + the different user privilege(TS-3469) + """ + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + # init the tdsql + tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + # user info + self.username = 'test' + self.password = 'test' + # db info + self.dbname = "user_privilege_show" + self.stbname = 'stb' + self.common_tbname = "tb" + self.ctbname_list = ["ct1", "ct2"] + self.column_dict = { + 'ts': 'timestamp', + 'col1': 'float', + 'col2': 'int', + } + self.tag_dict = { + 'ctbname': 'binary(10)' + } + + # privilege check scenario info + self.privilege_check_dic = {} + self.senario_type = ["stable", "table", "ctable"] + self.priv_type = ["read", "write", "all", "none"] + # stable senarios + # include the show stable xxx command test senarios and expect result, true as have privilege, false as no privilege + # the list element is (db_privilege, stable_privilege, expect_res) + st_senarios_list = [] + for senario in list(product(self.priv_type, repeat=2)): + expect_res = True + if senario == ("write", "write") or senario == ("none", "none") or senario == ("none", "write") or senario == ("write", "none"): + expect_res = False + st_senarios_list.append(senario + (expect_res,)) + # self.privilege_check_dic["stable"] = st_senarios_list + + # table senarios + # the list element is (db_privilege, table_privilege, expect_res) + self.privilege_check_dic["table"] = st_senarios_list + + # child table senarios + # the list element is (db_privilege, stable_privilege, ctable_privilege, expect_res) + ct_senarios_list = [] + for senario in list(product(self.priv_type, repeat=3)): + expect_res = True + if senario[2] == "write" or (senario[2] == "none" and senario[1] == "write") or (senario[2] == "none" and senario[1] == "none" and senario[0] == "write"): + expect_res = False + ct_senarios_list.append(senario + (expect_res,)) + self.privilege_check_dic["ctable"] = ct_senarios_list + + def prepare_data(self, senario_type): + """Create the db and data for test + """ + if senario_type == "stable": + # db name + self.dbname = self.dbname + '_stable' + elif senario_type == "table": + # db name + self.dbname = self.dbname + '_table' + else: + # db name + self.dbname = self.dbname + '_ctable' + + # create datebase + tdSql.execute(f"create database {self.dbname}") + tdLog.debug("sql:" + f"create database {self.dbname}") + tdSql.execute(f"use {self.dbname}") + tdLog.debug("sql:" + f"use {self.dbname}") + + # create tables + if "_stable" in self.dbname: + # create stable + tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) + tdLog.debug("Create stable {} successfully".format(self.stbname)) + elif "_table" in self.dbname: + # create common table + tdSql.execute(f"create table {self.common_tbname}(ts timestamp, col1 float, col2 int)") + tdLog.debug("sql:" + f"create table {self.common_tbname}(ts timestamp, col1 float, col2 int)") + else: + # create stable and child table + tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) + tdLog.debug("Create stable {} successfully".format(self.stbname)) + for ctname in self.ctbname_list: + tdSql.execute(f"create table {ctname} using {self.stbname} tags('{ctname}')") + tdLog.debug("sql:" + f"create table {ctname} using {self.stbname} tags('{ctname}')") + + def create_user(self): + """Create the user for test + """ + tdSql.execute(f'create user {self.username} pass "{self.password}"') + tdLog.debug("sql:" + f'create user {self.username} pass "{self.password}"') + + def grant_privilege(self, username, privilege, privilege_obj, ctable_include=False, tag_condition=None): + """Add the privilege for the user + """ + try: + if ctable_include and tag_condition: + tdSql.execute(f'grant {privilege} on {self.dbname}.{privilege_obj} with {tag_condition} to {username}') + tdLog.debug("sql:" + f'grant {privilege} on {self.dbname}.{privilege_obj} with {tag_condition} to {username}') + else: + tdSql.execute(f'grant {privilege} on {self.dbname}.{privilege_obj} to {username}') + tdLog.debug("sql:" + f'grant {privilege} on {self.dbname}.{privilege_obj} to {username}') + except Exception as ex: + tdLog.exit(ex) + + def remove_privilege(self, username, privilege, privilege_obj, ctable_include=False, tag_condition=None): + """Remove the privilege for the user + """ + try: + if ctable_include and tag_condition: + tdSql.execute(f'revoke {privilege} on {self.dbname}.{privilege_obj} with {tag_condition} from {username}') + tdLog.debug("sql:" + f'revoke {privilege} on {self.dbname}.{privilege_obj} with {tag_condition} from {username}') + else: + tdSql.execute(f'revoke {privilege} on {self.dbname}.{privilege_obj} from {username}') + tdLog.debug("sql:" + f'revoke {privilege} on {self.dbname}.{privilege_obj} from {username}') + except Exception as ex: + tdLog.exit(ex) + + def run(self): + """Currently, the test case can't be executed for all of the privilege combinations cause + the table privilege isn't finished by dev team, only left one senario: + db read privilege for user and show create table command; will udpate the test case once + the table privilege function is finished + """ + self.create_user() + + # temp solution only for the db read privilege verification + self.prepare_data("table") + # grant db read privilege + self.grant_privilege(self.username, "read", "*") + # create the taos connection with -utest -ptest + testconn = taos.connect(user=self.username, password=self.password) + testconn.execute("use %s;" % self.dbname) + # show the user privileges + res = testconn.query("select * from information_schema.ins_user_privileges;") + tdLog.debug("Current information_schema.ins_user_privileges values: {}".format(res.fetch_all())) + # query execution + sql = "show create table " + self.common_tbname + ";" + tdLog.debug("sql: %s" % sql) + res = testconn.query(sql) + # query result + tdLog.debug("sql res:" + str(res.fetch_all())) + # remove the privilege + self.remove_privilege(self.username, "read", "*") + # clear env + testconn.close() + tdSql.execute(f"drop database {self.dbname}") + + """ + for senario_type in self.privilege_check_dic.keys(): + tdLog.debug(f"---------check the {senario_type} privilege----------") + self.prepare_data(senario_type) + for senario in self.privilege_check_dic[senario_type]: + # grant db privilege + if senario[0] != "none": + self.grant_privilege(self.username, senario[0], "*") + # grant stable privilege + if senario[1] != "none": + self.grant_privilege(self.username, senario[1], self.stbname if senario_type == "stable" or senario_type == "ctable" else self.common_tbname) + if senario_type == "stable" or senario_type == "table": + tdLog.debug(f"check the db privilege: {senario[0]}, (s)table privilege: {senario[1]}") + else: + if senario[2] != "none": + # grant child table privilege + self.grant_privilege(self.username, senario[2], self.stbname, True, "ctbname='ct1'") + tdLog.debug(f"check the db privilege: {senario[0]}, (s)table privilege: {senario[1]}, ctable privilege: {senario[2]}") + testconn = taos.connect(user=self.username, password=self.password) + tdLog.debug("Create taos connection with user: {}, password: {}".format(self.username, self.password)) + try: + testconn.execute("use %s;" % self.dbname) + except BaseException as ex: + if (senario_type in ["stable", "table"] and senario[0] == "none" and senario[1] == "none") or (senario_type == "ctable" and senario[0] == "none" and senario[1] == "none" and senario[2] == "none"): + continue + else: + tdLog.exit(ex) + + # query privileges for user + res = testconn.query("select * from information_schema.ins_user_privileges;") + tdLog.debug("Current information_schema.ins_user_privileges values: {}".format(res.fetch_all())) + + if senario_type == "stable" or senario_type == "table": + sql = "show create " + (("stable " + self.stbname) if senario_type == "stable" else (f"table {self.dbname}." + self.common_tbname + ";")) + if senario[2]: + tdLog.debug("sql: %s" % sql) + tdLog.debug(f"expected result: {senario[2]}") + res = testconn.query(sql) + tdLog.debug("sql res:" + res.fetch_all()) + else: + exception_flag = False + try: + tdLog.debug("sql: %s" % sql) + tdLog.debug(f"expected result: {senario[2]}") + res = testconn.query(sql) + tdLog.debug("sql res:" + res.fetch_all()) + except BaseException: + exception_flag = True + caller = inspect.getframeinfo(inspect.stack()[1][0]) + tdLog.debug(f"{caller.filename}({caller.lineno}) failed to check the db privilege {senario[0]} and stable privilege {senario[1]} failed as expected") + if not exception_flag: + pass + # tdLog.exit("The expected exception isn't occurred") + else: + sql = f"show create table {self.dbname}.{self.ctbname_list[0]};" + if senario[3]: + tdLog.debug("sql: %s" % sql) + tdLog.debug(f"expected result: {senario[3]}") + res = testconn.query(sql) + tdLog.debug(res.fetch_all()) + else: + exception_flag = False + try: + tdLog.debug("sql: %s" % sql) + tdLog.debug(f"expected result: {senario[3]}") + res = testconn.query(sql) + tdLog.debug(res.fetch_all()) + except BaseException: + exception_flag = True + caller = inspect.getframeinfo(inspect.stack()[1][0]) + tdLog.debug(f"{caller.filename}({caller.lineno}) failed to check the db privilege {senario[0]}, stable privilege {senario[1]} and ctable privilege {senario[2]} failed as expected") + if not exception_flag: + pass + # tdLog.exit("The expected exception isn't occurred") + + # remove db privilege + if senario[0] != "none": + self.remove_privilege(self.username, senario[0], "*") + # remove stable privilege + if senario[1] != "none": + self.remove_privilege(self.username, senario[1], self.stbname if senario_type == "stable" else self.common_tbname) + # remove child table privilege + if senario_type == "ctable": + if senario[2] != "none": + self.remove_privilege(self.username, senario[2], self.ctbname_list[0], True, "ctbname='ct1'") + testconn.close() + + # remove the database + tdSql.execute(f"drop database {self.dbname}") + # reset the dbname + self.dbname = "user_privilege_show" + """ + + def stop(self): + # remove the user + tdSql.execute(f'drop user {self.username}') + # close the connection + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())