Merge branch 'td_25179_update' of https://github.com/taosdata/TDengine into td_25179_update
This commit is contained in:
commit
37c39a1e46
|
@ -0,0 +1,126 @@
|
||||||
|
from itertools import product
|
||||||
|
import taos
|
||||||
|
import random
|
||||||
|
from taos.tmq import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.common import *
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.sqlset import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
# init the tdsql
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
self.setsql = TDSetSql()
|
||||||
|
# user info
|
||||||
|
self.userNum = 100
|
||||||
|
self.basic_username = "user"
|
||||||
|
self.password = "pwd"
|
||||||
|
|
||||||
|
# db info
|
||||||
|
self.dbname = "user_privilege_multi_users"
|
||||||
|
self.stbname = 'stb'
|
||||||
|
self.ctbname_num = 100
|
||||||
|
self.column_dict = {
|
||||||
|
'ts': 'timestamp',
|
||||||
|
'col1': 'float',
|
||||||
|
'col2': 'int',
|
||||||
|
}
|
||||||
|
self.tag_dict = {
|
||||||
|
'ctbname': 'binary(10)'
|
||||||
|
}
|
||||||
|
|
||||||
|
self.privilege_list = []
|
||||||
|
|
||||||
|
def prepare_data(self):
|
||||||
|
"""Create the db and data for test
|
||||||
|
"""
|
||||||
|
# create datebase
|
||||||
|
tdSql.execute(f"create database {self.dbname}")
|
||||||
|
tdLog.debug("sql:" + f"create database {self.dbname}")
|
||||||
|
tdSql.execute(f"use {self.dbname}")
|
||||||
|
tdLog.debug("sql:" + f"use {self.dbname}")
|
||||||
|
|
||||||
|
# create super table
|
||||||
|
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
|
||||||
|
tdLog.debug("Create stable {} successfully".format(self.stbname))
|
||||||
|
for ctbIndex in range(self.ctbname_num):
|
||||||
|
ctname = f"ctb{ctbIndex}"
|
||||||
|
tdSql.execute(f"create table {ctname} using {self.stbname} tags('{ctname}')")
|
||||||
|
tdLog.debug("sql:" + f"create table {ctname} using {self.stbname} tags('{ctname}')")
|
||||||
|
|
||||||
|
def create_multiusers(self):
|
||||||
|
"""Create the user for test
|
||||||
|
"""
|
||||||
|
for userIndex in range(self.userNum):
|
||||||
|
username = f"{self.basic_username}{userIndex}"
|
||||||
|
tdSql.execute(f'create user {username} pass "{self.password}"')
|
||||||
|
tdLog.debug("sql:" + f'create user {username} pass "{self.password}"')
|
||||||
|
|
||||||
|
def grant_privilege(self):
|
||||||
|
"""Add the privilege for the users
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
for userIndex in range(self.userNum):
|
||||||
|
username = f"{self.basic_username}{userIndex}"
|
||||||
|
privilege = random.choice(["read", "write", "all"])
|
||||||
|
condition = f"ctbname='ctb{userIndex}'"
|
||||||
|
self.privilege_list.append({
|
||||||
|
"username": username,
|
||||||
|
"privilege": privilege,
|
||||||
|
"condition": condition
|
||||||
|
})
|
||||||
|
tdSql.execute(f'grant {privilege} on {self.dbname}.{self.stbname} with {condition} to {username}')
|
||||||
|
tdLog.debug("sql:" + f'grant {privilege} on {self.dbname}.{self.stbname} with {condition} to {username}')
|
||||||
|
except Exception as ex:
|
||||||
|
tdLog.exit(ex)
|
||||||
|
|
||||||
|
def remove_privilege(self):
|
||||||
|
"""Remove the privilege for the users
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
for item in self.privilege_list:
|
||||||
|
username = item["username"]
|
||||||
|
privilege = item["privilege"]
|
||||||
|
condition = item["condition"]
|
||||||
|
tdSql.execute(f'revoke {privilege} on {self.dbname}.{self.stbname} with {condition} from {username}')
|
||||||
|
tdLog.debug("sql:" + f'revoke {privilege} on {self.dbname}.{self.stbname} with {condition} from {username}')
|
||||||
|
except Exception as ex:
|
||||||
|
tdLog.exit(ex)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""
|
||||||
|
Check the information from information_schema.ins_user_privileges
|
||||||
|
"""
|
||||||
|
self.create_multiusers()
|
||||||
|
self.prepare_data()
|
||||||
|
# grant privilege to users
|
||||||
|
self.grant_privilege()
|
||||||
|
# check information_schema.ins_user_privileges
|
||||||
|
tdSql.query("select * from information_schema.ins_user_privileges;")
|
||||||
|
tdLog.debug("Current information_schema.ins_user_privileges values: {}".format(tdSql.queryResult))
|
||||||
|
if len(tdSql.queryResult) >= self.userNum:
|
||||||
|
tdLog.debug("case passed")
|
||||||
|
else:
|
||||||
|
tdLog.exit("The privilege number in information_schema.ins_user_privileges is incorrect")
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
# remove the privilege
|
||||||
|
self.remove_privilege()
|
||||||
|
# clear env
|
||||||
|
tdSql.execute(f"drop database {self.dbname}")
|
||||||
|
# remove the users
|
||||||
|
for userIndex in range(self.userNum):
|
||||||
|
username = f"{self.basic_username}{userIndex}"
|
||||||
|
tdSql.execute(f'drop user {username}')
|
||||||
|
# close the connection
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -0,0 +1,127 @@
|
||||||
|
import sys
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from taos.tmq import *
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
sys.path.append("./7-tmq")
|
||||||
|
from tmqCommon import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
|
self.db_name = "tmq_db"
|
||||||
|
self.topic_name = "tmq_topic"
|
||||||
|
self.stable_name = "tmqst"
|
||||||
|
|
||||||
|
|
||||||
|
def prepareData(self):
|
||||||
|
# create database
|
||||||
|
tdSql.execute("create database if not exists %s;"%(self.db_name))
|
||||||
|
tdSql.execute("use %s;"%(self.db_name))
|
||||||
|
# create stable
|
||||||
|
tdSql.execute("create table %s.tmqst (ts timestamp, col0 int) tags(groupid int);"%(self.db_name))
|
||||||
|
# create child tables
|
||||||
|
tdSql.execute("create table tmqct_1 using %s.%s tags(1);"%(self.db_name, self.stable_name))
|
||||||
|
tdSql.execute("create table tmqct_2 using %s.%s tags(2);"%(self.db_name, self.stable_name))
|
||||||
|
tdSql.execute("create table tmqct_3 using %s.%s tags(3);"%(self.db_name, self.stable_name))
|
||||||
|
tdSql.execute("create table tmqct_4 using %s.%s tags(4);"%(self.db_name, self.stable_name))
|
||||||
|
tdSql.execute("create table tmqct_5 using %s.%s tags(5);"%(self.db_name, self.stable_name))
|
||||||
|
# insert into data
|
||||||
|
ctb_list = ["tmqct_1", "tmqct_2", "tmqct_3", "tmqct_4", "tmqct_5"]
|
||||||
|
for i in range(5):
|
||||||
|
sql = "insert into %s "%(ctb_list[i])
|
||||||
|
sql_values = "values"
|
||||||
|
for j in range(1000 * i, 1000 * (i+1)):
|
||||||
|
sql_values += "(%s, %s)"%("now" if j == 0 else "now+%s"%(str(j) + "s"), str(j))
|
||||||
|
sql += sql_values + ";"
|
||||||
|
tdLog.info(sql)
|
||||||
|
tdSql.execute(sql)
|
||||||
|
tdLog.info("Insert data into child tables successfully")
|
||||||
|
# create topic
|
||||||
|
tdSql.execute("create topic %s as select * from %s;"%(self.topic_name, self.stable_name))
|
||||||
|
|
||||||
|
def tmqSubscribe(self, inputDict):
|
||||||
|
consumer_dict = {
|
||||||
|
"group.id": inputDict['group_id'],
|
||||||
|
"client.id": "client",
|
||||||
|
"td.connect.user": "root",
|
||||||
|
"td.connect.pass": "taosdata",
|
||||||
|
"auto.commit.interval.ms": "1000",
|
||||||
|
"enable.auto.commit": inputDict['auto_commit'],
|
||||||
|
"auto.offset.reset": inputDict['offset_reset'],
|
||||||
|
"experimental.snapshot.enable": "false",
|
||||||
|
"msg.with.table.name": "false"
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer = Consumer(consumer_dict)
|
||||||
|
try:
|
||||||
|
consumer.subscribe([inputDict['topic_name']])
|
||||||
|
except Exception as e:
|
||||||
|
tdLog.info("consumer.subscribe() fail ")
|
||||||
|
tdLog.info("%s"%(e))
|
||||||
|
|
||||||
|
tdLog.info("create consumer success!")
|
||||||
|
return consumer
|
||||||
|
|
||||||
|
def test_seek_and_committed_position_with_autocommit(self):
|
||||||
|
try:
|
||||||
|
self.prepareData()
|
||||||
|
inputDict = {
|
||||||
|
"topic_name": self.topic_name,
|
||||||
|
"group_id": "1",
|
||||||
|
"auto_commit": "true",
|
||||||
|
"offset_reset": "earliest"
|
||||||
|
}
|
||||||
|
consumer = self.tmqSubscribe(inputDict)
|
||||||
|
while(True):
|
||||||
|
res = consumer.poll(1)
|
||||||
|
if not res:
|
||||||
|
break
|
||||||
|
err = res.error()
|
||||||
|
if err is not None:
|
||||||
|
raise err
|
||||||
|
val = res.value()
|
||||||
|
for block in val:
|
||||||
|
tdLog.info("block.fetchall() number: %s"%(len(block.fetchall())))
|
||||||
|
|
||||||
|
partitions = consumer.assignment()
|
||||||
|
position_partitions = consumer.position(partitions)
|
||||||
|
tdLog.info("position_partitions: %s"%(position_partitions))
|
||||||
|
for i in range(len(position_partitions)):
|
||||||
|
tdLog.info("position_partitions[%s].offset: %s"%(i, position_partitions[i].offset))
|
||||||
|
committed_partitions = consumer.committed(partitions)
|
||||||
|
tdLog.info("committed_partitions: %s"%(committed_partitions))
|
||||||
|
for i in range(len(committed_partitions)):
|
||||||
|
tdLog.info("committed_partitions[%s].offset: %s"%(i, committed_partitions[i].offset))
|
||||||
|
assert(len(position_partitions) == len(committed_partitions))
|
||||||
|
for i in range(len(position_partitions)):
|
||||||
|
assert(position_partitions[i].offset == committed_partitions[i].offset)
|
||||||
|
# seek to the beginning of the topic
|
||||||
|
|
||||||
|
except Exception as ex:
|
||||||
|
raise Exception("Failed to test seek and committed position with autocommit with error: {}".format(str(ex)))
|
||||||
|
finally:
|
||||||
|
consumer.unsubscribe()
|
||||||
|
consumer.close()
|
||||||
|
|
||||||
|
def test_commit_by_offset(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.test_seek_and_committed_position_with_autocommit()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue