fix case
This commit is contained in:
parent
339a75c818
commit
64be1bf13b
|
@ -205,53 +205,37 @@ class TDTestCase:
|
||||||
def __grant_user_privileges(self, privilege, dbname=None, user_name="root"):
|
def __grant_user_privileges(self, privilege, dbname=None, user_name="root"):
|
||||||
return f"GRANT {privilege} ON {self.__priv_level(dbname)} TO {user_name} "
|
return f"GRANT {privilege} ON {self.__priv_level(dbname)} TO {user_name} "
|
||||||
|
|
||||||
def grant_check_read(self, user="root", passwd="taosdata"):
|
def grant_check(self, user="root", passwd="taosdata", priv=PRIVILEGES_ALL):
|
||||||
# sourcery skip: class-extract-method
|
|
||||||
with taos_connect(user=user, passwd=passwd) as user:
|
with taos_connect(user=user, passwd=passwd) as user:
|
||||||
user.query("use db")
|
user.query("use db")
|
||||||
user.query("show tables")
|
user.query("show tables")
|
||||||
user.query("select * from ct1")
|
if priv in [PRIVILEGES_ALL, PRIVILEGES_READ]:
|
||||||
user.error("insert into t1 (ts) values (now())")
|
user.query("select * from ct1")
|
||||||
|
else:
|
||||||
def grant_check_write(self, user="root", passwd="taosdata"):
|
user.error("select * from ct1")
|
||||||
with taos_connect(user=user, passwd=passwd) as user:
|
if priv in [PRIVILEGES_ALL, PRIVILEGES_WRITE]:
|
||||||
user.query("use db")
|
user.query("insert into t1 (ts) values (now())")
|
||||||
user.query("show tables")
|
else:
|
||||||
user.error("select * from ct1")
|
user.error("insert into t1 (ts) values (now())")
|
||||||
user.query("insert into t1 (ts) values (now())")
|
|
||||||
|
|
||||||
def grant_check_all(self, user="root", passwd="taosdata"):
|
|
||||||
with taos_connect(user=user, passwd=passwd) as user:
|
|
||||||
user.query("use db")
|
|
||||||
user.query("show tables")
|
|
||||||
user.query("select * from ct1")
|
|
||||||
user.query("insert into t1 (ts) values (now())")
|
|
||||||
|
|
||||||
def grant_check_none(self, user="root", passwd="taosdata"):
|
|
||||||
with taos_connect(user=user, passwd=passwd) as user:
|
|
||||||
user.query("use db")
|
|
||||||
user.query("show tables")
|
|
||||||
user.error("select * from ct1")
|
|
||||||
user.error("insert into t1 (ts) values (now())")
|
|
||||||
|
|
||||||
def test_grant_current(self):
|
def test_grant_current(self):
|
||||||
tdLog.printNoPrefix("==========step 1.0: if do not grant, can not read/write")
|
tdLog.printNoPrefix("==========step 1.0: if do not grant, can not read/write")
|
||||||
self.grant_check_none(user=self.__user_list[0], passwd=self.__passwd_list[0])
|
self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=None)
|
||||||
|
|
||||||
tdLog.printNoPrefix("==========step 1.1: grant read, can read, can not write")
|
tdLog.printNoPrefix("==========step 1.1: grant read, can read, can not write")
|
||||||
sql = self.__grant_user_privileges(privilege=self.__privilege[1], user_name=self.__user_list[0])
|
sql = self.__grant_user_privileges(privilege=PRIVILEGES_READ, user_name=self.__user_list[0])
|
||||||
tdLog.info(sql)
|
tdLog.info(sql)
|
||||||
tdSql.query(sql)
|
tdSql.query(sql)
|
||||||
self.grant_check_read(user=self.__user_list[0], passwd=self.__passwd_list[0])
|
self.grant_check(user=self.__user_list[0], passwd=self.__passwd_list[0], priv=PRIVILEGES_READ)
|
||||||
|
|
||||||
tdLog.printNoPrefix("==========step 1.2: grant write, can write, can not read")
|
tdLog.printNoPrefix("==========step 1.2: grant write, can write, can not read")
|
||||||
sql = self.__grant_user_privileges(privilege=self.__privilege[2], user_name=self.__user_list[1])
|
sql = self.__grant_user_privileges(privilege=PRIVILEGES_WRITE, user_name=self.__user_list[1])
|
||||||
tdLog.info(sql)
|
tdLog.info(sql)
|
||||||
tdSql.query(sql)
|
tdSql.query(sql)
|
||||||
self.grant_check_write(user=self.__user_list[1], passwd=self.__passwd_list[1])
|
self.grant_check(user=self.__user_list[1], passwd=self.__passwd_list[1], priv=PRIVILEGES_WRITE)
|
||||||
|
|
||||||
tdLog.printNoPrefix("==========step 1.3: grant all, can write and read")
|
tdLog.printNoPrefix("==========step 1.3: grant all, can write and read")
|
||||||
sql = self.__grant_user_privileges(privilege=self.__privilege[0], user_name=self.__user_list[2])
|
sql = self.__grant_user_privileges(privilege=PRIVILEGES_ALL, user_name=self.__user_list[2])
|
||||||
tdLog.info(sql)
|
tdLog.info(sql)
|
||||||
tdSql.query(sql)
|
tdSql.query(sql)
|
||||||
self.grant_check_all(user=self.__user_list[2], passwd=self.__passwd_list[2])
|
self.grant_check_all(user=self.__user_list[2], passwd=self.__passwd_list[2])
|
||||||
|
|
Loading…
Reference in New Issue