From 87c2c7841d92d1e8d526835385a6604776e8d73a Mon Sep 17 00:00:00 2001 From: cpwu Date: Thu, 12 May 2022 12:43:57 +0800 Subject: [PATCH] add case --- tests/system-test/0-others/user_control.py | 148 ++++++++++++++++++++- 1 file changed, 144 insertions(+), 4 deletions(-) diff --git a/tests/system-test/0-others/user_control.py b/tests/system-test/0-others/user_control.py index 8b0a777465..24a347898a 100644 --- a/tests/system-test/0-others/user_control.py +++ b/tests/system-test/0-others/user_control.py @@ -1,3 +1,4 @@ +from distutils.command.config import config import taos import sys @@ -5,27 +6,166 @@ from util.log import * from util.sql import * from util.cases import * + PRIVILEGES_ALL = "ALL" PRIVILEGES_READ = "READ" PRIVILEGES_WRITE = "WRITE" +class TDconnect: + def __init__(self, + host = None, + port = None, + user = None, + password = None, + database = None, + config = None, + ) -> None: + self._conn = None + self._host = host + self._user = user + self._password = password + self._database = database + self._port = port + self._config = config + + def __enter__(self): + self._conn = taos.connect( + host =self._host, + port =self._port, + user =self._user, + password=self._password, + database=self._database, + config =self._config + ) + + self.cursor = self._conn.cursor() + return self.cursor + + + + def __exit__(self, types, values, trace): + if self._conn: + self.cursor.close() + self._conn.close() + +def taos_connect( + host = "127.0.0.1", + port = 6030, + user = "root", + passwd = "taosdata", + database= None, + config = None +): + return TDconnect( + host = host, + port=port, + user=user, + password=passwd, + database=database, + config=config + ) + class TDTestCase: def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) + @property + def __user_list(self): + return [f"user_test{i}" for i in range(self.users_count) ] + + @property + def __passwd_list(self): + return [f"taosdata{i}" for i in range(self.users_count) ] + + @property + def __privilege(self): + return [ PRIVILEGES_ALL, PRIVILEGES_READ, PRIVILEGES_WRITE ] + + def __priv_level(self, dbname=None): + return f"{dbname}.*" if dbname else "*.*" + + def create_user_current(self): + users = self.__user_list + passwds = self.__passwd_list for i in range(self.users_count): - tdSql.execute(f"create user test{i} pass 'taosdata{i}' ") + tdSql.execute(f"create user {users[i]} pass '{passwds[i]}' ") - def grant_user_privileges(self, dbname): - return "grant " + tdSql.query("show users") + tdSql.checkRows(self.users_count + 1) + def create_user_err(self): + sqls = [ + "create users u1 pass 'u1passwd' ", + "create user '' pass 'u1passwd' ", + "create user pass 'u1passwd' ", + "create user u1 pass u1passwd ", + "create user u1 password 'u1passwd' ", + "create user u1 pass u1passwd ", + "create user u1 pass '' ", + "create user u1 pass ' ' ", + "create user u1 pass ", + "create user u1 u2 pass 'u1passwd' 'u2passwd' ", + "create user u1 u2 pass 'u1passwd', 'u2passwd' ", + "create user u1, u2 pass 'u1passwd', 'u2passwd' ", + "create user u1, u2 pass 'u1passwd' 'u2passwd' ", + # length of user_name must <= 23 + "create user u12345678901234567890123 pass 'u1passwd' " , + # length of passwd must <= 15 + "create user u1 pass 'u123456789012345' " , + # password must have not " ' ~ ` \ + "create user u1 pass 'u1passwd\\' " , + "create user u1 pass 'u1passwd~' " , + "create user u1 pass 'u1passwd\"' " , + "create user u1 pass 'u1passwd\'' " , + "create user u1 pass 'u1passwd`' " , + # must after create a user named u1 + "create user u1 pass 'u1passwd' " , + ] + + tdSql.execute("create user u1 pass 'u1passwd' ") + for sql in sqls: + tdSql.error(sql) + + def grant_user_privileges(self, privilege, dbname=None, user_name="root"): + return f"GRANT {privilege} ON {self.__priv_level(dbname)} TO {user_name} " + + def test_user_create(self): + self.create_user_current() + self.create_user_err() def run(self): + + # 默认只有 root 用户 + tdLog.printNoPrefix("==========step0: init, user list only has root account") + tdSql.query("show users") + tdSql.checkData(0, 0, "root") + tdSql.checkData(0, 1, "super") + + # root用户权限 + # 创建用户测试 + tdLog.printNoPrefix("==========step1: create user test") self.users_count = 5 - tdSql.execute("") + self.test_user_create() + + # 查看用户 + tdLog.printNoPrefix("==========step2: show user test") + tdSql.query("show users") + tdSql.checkRows(self.users_count + 2) + + # 修改密码 + # + + # 密码登录认证 + + + # 删除用户测试 + tdLog.printNoPrefix("==========step2: drop user") + + +