build: delete TaosConsumer and TaosTmq from taospy (#20076) (#20091)

This commit is contained in:
sunpeng 2023-02-22 17:32:14 +08:00 committed by GitHub
parent bd79f4c26b
commit d4fdf17cae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 645 additions and 620 deletions

View File

@ -18,7 +18,8 @@ from __future__ import annotations
from typing import Any, Set, Tuple from typing import Any, Set, Tuple
from typing import Dict from typing import Dict
from typing import List from typing import List
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none from typing import \
Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
import textwrap import textwrap
import time import time
@ -39,7 +40,6 @@ import gc
import taos import taos
from taos.tmq import * from taos.tmq import *
from .shared.types import TdColumns, TdTags from .shared.types import TdColumns, TdTags
# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess # from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
@ -69,6 +69,7 @@ gSvcMgr: Optional[ServiceManager] # TODO: refactor this hack, use dep injecti
# logger: logging.Logger # logger: logging.Logger
gContainer: Container gContainer: Container
# def runThread(wt: WorkerThread): # def runThread(wt: WorkerThread):
# wt.run() # wt.run()
@ -163,7 +164,6 @@ class WorkerThread:
Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break break
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
try: try:
if (Config.getConfig().per_thread_db_connection): # most likely TRUE if (Config.getConfig().per_thread_db_connection): # most likely TRUE
@ -172,7 +172,8 @@ class WorkerThread:
# self.useDb() # might encounter exceptions. TODO: catch # self.useDb() # might encounter exceptions. TODO: catch
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno in [0x383, 0x386, 0x00B, 0x014] : # invalid database, dropping, Unable to establish connection, Database not ready if errno in [0x383, 0x386, 0x00B,
0x014]: # invalid database, dropping, Unable to establish connection, Database not ready
# ignore # ignore
dummy = 0 dummy = 0
else: else:
@ -251,6 +252,7 @@ class WorkerThread:
# else: # else:
# return self._tc.getDbState().getDbConn().query(sql) # return self._tc.getDbState().getDbConn().query(sql)
# The coordinator of all worker threads, mostly running in main thread # The coordinator of all worker threads, mostly running in main thread
@ -374,7 +376,8 @@ class ThreadCoordinator:
# TODO: saw an error here once, let's print out stack info for err? # TODO: saw an error here once, let's print out stack info for err?
traceback.print_stack() # Stack frame to here. traceback.print_stack() # Stack frame to here.
Logging.info("Caused by:") Logging.info("Caused by:")
traceback.print_exception(*sys.exc_info()) # Ref: https://www.geeksforgeeks.org/how-to-print-exception-stack-trace-in-python/ traceback.print_exception(
*sys.exc_info()) # Ref: https://www.geeksforgeeks.org/how-to-print-exception-stack-trace-in-python/
transitionFailed = True transitionFailed = True
self._te = None # Not running any more self._te = None # Not running any more
self._execStats.registerFailure("State transition error: {}".format(err)) self._execStats.registerFailure("State transition error: {}".format(err))
@ -409,7 +412,6 @@ class ThreadCoordinator:
# print("\n") # print("\n")
# print(h.heap()) # print(h.heap())
try: try:
self._syncAtBarrier() # For now just cross the barrier self._syncAtBarrier() # For now just cross the barrier
Progress.emit(Progress.END_THREAD_STEP) Progress.emit(Progress.END_THREAD_STEP)
@ -492,7 +494,6 @@ class ThreadCoordinator:
self._execStats = None self._execStats = None
self._runStatus = None self._runStatus = None
def printStats(self): def printStats(self):
self._execStats.printStats() self._execStats.printStats()
@ -564,6 +565,7 @@ class ThreadCoordinator:
with self._lock: with self._lock:
self._executedTasks.append(task) self._executedTasks.append(task)
class ThreadPool: class ThreadPool:
def __init__(self, numThreads, maxSteps): def __init__(self, numThreads, maxSteps):
self.numThreads = numThreads self.numThreads = numThreads
@ -587,6 +589,7 @@ class ThreadPool:
def cleanup(self): def cleanup(self):
self.threadList = [] # maybe clean up each? self.threadList = [] # maybe clean up each?
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers # A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names # for new table names
@ -801,7 +804,8 @@ class AnyState:
for task in tasks: for task in tasks:
if isinstance(task, cls): if isinstance(task, cls):
raise CrashGenError( raise CrashGenError(
"This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__)) "This task: {}, is not expected to be present, given the success/failure of others".format(
cls.__name__))
def assertNoSuccess(self, tasks, cls): def assertNoSuccess(self, tasks, cls):
for task in tasks: for task in tasks:
@ -1016,7 +1020,6 @@ class StateMechine:
sTable = self._db.getFixedSuperTable() sTable = self._db.getFixedSuperTable()
if sTable.hasRegTables(dbc): # no regular tables if sTable.hasRegTables(dbc): # no regular tables
# print("debug=====*\n"*100) # print("debug=====*\n"*100)
Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
@ -1096,10 +1099,13 @@ class StateMechine:
weightsTypes = BasicTypes.copy() weightsTypes = BasicTypes.copy()
# this matrixs can balance the Frequency of TaskTypes # this matrixs can balance the Frequency of TaskTypes
balance_TaskType_matrixs = {'TaskDropDb': 5 , 'TaskDropTopics': 20 , 'TaskDropStreams':10 , 'TaskDropStreamTables':10 , balance_TaskType_matrixs = {'TaskDropDb': 5, 'TaskDropTopics': 20, 'TaskDropStreams': 10,
'TaskDropStreamTables': 10,
'TaskReadData': 50, 'TaskDropSuperTable': 5, 'TaskAlterTags': 3, 'TaskAddData': 10, 'TaskReadData': 50, 'TaskDropSuperTable': 5, 'TaskAlterTags': 3, 'TaskAddData': 10,
'TaskDeleteData':10 , 'TaskCreateDb':10 , 'TaskCreateStream': 3, 'TaskCreateTopic' :3, 'TaskDeleteData': 10, 'TaskCreateDb': 10, 'TaskCreateStream': 3,
'TaskCreateConsumers':10, 'TaskCreateSuperTable': 10 } # TaskType : balance_matrixs of task 'TaskCreateTopic': 3,
'TaskCreateConsumers': 10,
'TaskCreateSuperTable': 10} # TaskType : balance_matrixs of task
for task, weights in balance_TaskType_matrixs.items(): for task, weights in balance_TaskType_matrixs.items():
@ -1111,7 +1117,6 @@ class StateMechine:
task = random.sample(weightsTypes, 1) task = random.sample(weightsTypes, 1)
return task[0] return task[0]
# ref: # ref:
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
def _weighted_choice_sub(self, weights) -> int: def _weighted_choice_sub(self, weights) -> int:
@ -1123,6 +1128,7 @@ class StateMechine:
return i return i
raise CrashGenError("Unexpected no choice") raise CrashGenError("Unexpected no choice")
class Database: class Database:
''' We use this to represent an actual TDengine database inside a service instance, ''' We use this to represent an actual TDengine database inside a service instance,
possibly in a cluster environment. possibly in a cluster environment.
@ -1194,7 +1200,8 @@ class Database:
500 # a number representing seconds within 10 years 500 # a number representing seconds within 10 years
# print("elSec = {}".format(elSec)) # print("elSec = {}".format(elSec))
t3 = datetime.datetime(local_epoch_time[0]-10, local_epoch_time[1], local_epoch_time[2]) # default "keep" is 10 years t3 = datetime.datetime(local_epoch_time[0] - 10, local_epoch_time[1],
local_epoch_time[2]) # default "keep" is 10 years
t4 = datetime.datetime.fromtimestamp( t4 = datetime.datetime.fromtimestamp(
t3.timestamp() + elSec2) # see explanation above t3.timestamp() + elSec2) # see explanation above
Logging.debug("Setting up TICKS to start from: {}".format(t4)) Logging.debug("Setting up TICKS to start from: {}".format(t4))
@ -1210,11 +1217,14 @@ class Database:
# 10k at 1/20 chance, should be enough to avoid overlaps # 10k at 1/20 chance, should be enough to avoid overlaps
tick = cls.setupLastTick() tick = cls.setupLastTick()
cls._lastTick = tick cls._lastTick = tick
cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast cls._lastLaggingTick = tick + datetime.timedelta(0,
-60 * 2) # lagging behind 2 minutes, should catch up fast
# if : # should be quite a bit into the future # if : # should be quite a bit into the future
if Config.isSet('mix_oos_data') and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick if Config.isSet('mix_oos_data') and Dice.throw(
cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence 20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
cls._lastLaggingTick += datetime.timedelta(0,
1) # pick the next sequence from the lagging tick sequence
return cls._lastLaggingTick return cls._lastLaggingTick
else: # regular else: # regular
# add one second to it # add one second to it
@ -1334,8 +1344,6 @@ class Task():
self._execStats = execStats self._execStats = execStats
self._db = db # A task is always associated/for a specific DB self._db = db # A task is always associated/for a specific DB
def isSuccess(self): def isSuccess(self):
return self._err is None return self._err is None
@ -1417,9 +1425,6 @@ class Task():
0x0203, # Invalid value 0x0203, # Invalid value
0x03f0, # Stream already exist , topic already exists 0x03f0, # Stream already exist , topic already exists
1000 # REST catch-all error 1000 # REST catch-all error
]: ]:
return True # These are the ALWAYS-ACCEPTABLE ones return True # These are the ALWAYS-ACCEPTABLE ones
@ -1443,7 +1448,6 @@ class Task():
return False # Not an acceptable error return False # Not an acceptable error
def execute(self, wt: WorkerThread): def execute(self, wt: WorkerThread):
wt.verifyThreadSelf() wt.verifyThreadSelf()
self._workerThread = wt # type: ignore self._workerThread = wt # type: ignore
@ -1485,7 +1489,8 @@ class Task():
# raise # so that we see full stack # raise # so that we see full stack
traceback.print_exc() traceback.print_exc()
print( print(
"\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) + "\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(
errMsg) +
"----------------------------\n") "----------------------------\n")
# sys.exit(-1) # sys.exit(-1)
self._err = err self._err = err
@ -1718,10 +1723,15 @@ class TaskCreateDb(StateTransitionTask):
cache_model = Dice.choice(['none', 'last_row', 'last_value', 'both']) cache_model = Dice.choice(['none', 'last_row', 'last_value', 'both'])
buffer = random.randint(3, 128) buffer = random.randint(3, 128)
dbName = self._db.getName() dbName = self._db.getName()
self.execWtSql(wt, "create database {} {} {} vgroups {} cachemodel '{}' buffer {} ".format(dbName, repStr, updatePostfix, vg_nums, cache_model,buffer ) ) self.execWtSql(wt, "create database {} {} {} vgroups {} cachemodel '{}' buffer {} ".format(dbName, repStr,
updatePostfix,
vg_nums,
cache_model,
buffer))
if dbName == "db_0" and Config.getConfig().use_shadow_db: if dbName == "db_0" and Config.getConfig().use_shadow_db:
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix)) self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix))
class TaskDropDb(StateTransitionTask): class TaskDropDb(StateTransitionTask):
@classmethod @classmethod
def getEndState(cls): def getEndState(cls):
@ -1734,7 +1744,8 @@ class TaskDropDb(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
try: try:
self.queryWtSql(wt, "drop database {}".format(self._db.getName())) # drop database maybe failed ,because topic exists self.queryWtSql(wt, "drop database {}".format(
self._db.getName())) # drop database maybe failed ,because topic exists
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno in [0x0203]: # drop maybe failed if errno in [0x0203]: # drop maybe failed
@ -1769,7 +1780,8 @@ class TaskCreateStream(StateTransitionTask):
stbname = sTable.getName() stbname = sTable.getName()
sub_tables = sTable.getRegTables(wt.getDbConn()) sub_tables = sTable.getRegTables(wt.getDbConn())
aggExpr = Dice.choice([ aggExpr = Dice.choice([
'count(*)', 'avg(speed)', 'sum(speed)', 'stddev(speed)','min(speed)', 'max(speed)', 'first(speed)', 'last(speed)', 'count(*)', 'avg(speed)', 'sum(speed)', 'stddev(speed)', 'min(speed)', 'max(speed)', 'first(speed)',
'last(speed)',
'apercentile(speed, 10)', 'last_row(*)', 'twa(speed)']) 'apercentile(speed, 10)', 'last_row(*)', 'twa(speed)'])
stream_sql = '' # set default value stream_sql = '' # set default value
@ -1814,19 +1826,25 @@ class TaskCreateTopic(StateTransitionTask):
stbname = sTable.getName() stbname = sTable.getName()
sub_tables = sTable.getRegTables(wt.getDbConn()) sub_tables = sTable.getRegTables(wt.getDbConn())
scalarExpr = Dice.choice([ '*','speed','color','abs(speed)','acos(speed)','asin(speed)','atan(speed)','ceil(speed)','cos(speed)','cos(speed)', scalarExpr = Dice.choice(
'floor(speed)','log(speed,2)','pow(speed,2)','round(speed)','sin(speed)','sqrt(speed)','char_length(color)','concat(color,color)', ['*', 'speed', 'color', 'abs(speed)', 'acos(speed)', 'asin(speed)', 'atan(speed)', 'ceil(speed)',
'concat_ws(" ", color,color," ")','length(color)', 'lower(color)', 'ltrim(color)','substr(color , 2)','upper(color)','cast(speed as double)', 'cos(speed)', 'cos(speed)',
'floor(speed)', 'log(speed,2)', 'pow(speed,2)', 'round(speed)', 'sin(speed)', 'sqrt(speed)',
'char_length(color)', 'concat(color,color)',
'concat_ws(" ", color,color," ")', 'length(color)', 'lower(color)', 'ltrim(color)', 'substr(color , 2)',
'upper(color)', 'cast(speed as double)',
'cast(ts as bigint)']) 'cast(ts as bigint)'])
topic_sql = '' # set default value topic_sql = '' # set default value
if Dice.throw(3) == 0: # create topic : source data from sub query if Dice.throw(3) == 0: # create topic : source data from sub query
if sub_tables: # if not empty if sub_tables: # if not empty
sub_tbname = sub_tables[0] sub_tbname = sub_tables[0]
# create topic : source data from sub query of sub stable # create topic : source data from sub query of sub stable
topic_sql = 'create topic {} as select {} FROM {}.{} ; '.format(sub_topic_name,scalarExpr,dbname,sub_tbname) topic_sql = 'create topic {} as select {} FROM {}.{} ; '.format(sub_topic_name, scalarExpr, dbname,
sub_tbname)
else: # create topic : source data from sub query of stable else: # create topic : source data from sub query of stable
topic_sql = 'create topic {} as select {} FROM {}.{} '.format(super_topic_name,scalarExpr, dbname,stbname) topic_sql = 'create topic {} as select {} FROM {}.{} '.format(super_topic_name, scalarExpr, dbname,
stbname)
elif Dice.throw(3) == 1: # create topic : source data from super table elif Dice.throw(3) == 1: # create topic : source data from super table
topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic, dbname, stbname) topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic, dbname, stbname)
@ -1840,6 +1858,7 @@ class TaskCreateTopic(StateTransitionTask):
self.execWtSql(wt, topic_sql) self.execWtSql(wt, topic_sql)
Logging.debug("[OPS] db topic is creating at {}".format(time.time())) Logging.debug("[OPS] db topic is creating at {}".format(time.time()))
class TaskDropTopics(StateTransitionTask): class TaskDropTopics(StateTransitionTask):
@classmethod @classmethod
@ -1853,7 +1872,6 @@ class TaskDropTopics(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
dbname = self._db.getName() dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()): if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet") Logging.debug("Skipping task, no DB yet")
return return
@ -1865,6 +1883,7 @@ class TaskDropTopics(StateTransitionTask):
sTable.dropTopics(wt.getDbConn(), dbname, None) # drop topics of database sTable.dropTopics(wt.getDbConn(), dbname, None) # drop topics of database
sTable.dropTopics(wt.getDbConn(), dbname, tblName) # drop topics of stable sTable.dropTopics(wt.getDbConn(), dbname, tblName) # drop topics of stable
class TaskDropStreams(StateTransitionTask): class TaskDropStreams(StateTransitionTask):
@classmethod @classmethod
@ -1878,7 +1897,6 @@ class TaskDropStreams(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# dbname = self._db.getName() # dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()): if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet") Logging.debug("Skipping task, no DB yet")
return return
@ -1889,6 +1907,7 @@ class TaskDropStreams(StateTransitionTask):
if sTable.hasStreams(wt.getDbConn()): if sTable.hasStreams(wt.getDbConn()):
sTable.dropStreams(wt.getDbConn()) # drop stream of database sTable.dropStreams(wt.getDbConn()) # drop stream of database
class TaskDropStreamTables(StateTransitionTask): class TaskDropStreamTables(StateTransitionTask):
@classmethod @classmethod
@ -1902,7 +1921,6 @@ class TaskDropStreamTables(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# dbname = self._db.getName() # dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()): if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet") Logging.debug("Skipping task, no DB yet")
return return
@ -1913,6 +1931,7 @@ class TaskDropStreamTables(StateTransitionTask):
if sTable.hasStreamTables(wt.getDbConn()): if sTable.hasStreamTables(wt.getDbConn()):
sTable.dropStreamTables(wt.getDbConn()) # drop stream tables sTable.dropStreamTables(wt.getDbConn()) # drop stream tables
class TaskCreateConsumers(StateTransitionTask): class TaskCreateConsumers(StateTransitionTask):
@classmethod @classmethod
@ -1974,7 +1993,6 @@ class TdSuperTable:
def getName(self): def getName(self):
return self._stName return self._stName
def drop(self, dbc, skipCheck=False): def drop(self, dbc, skipCheck=False):
dbName = self._dbName dbName = self._dbName
if self.exists(dbc): # if myself exists if self.exists(dbc): # if myself exists
@ -2018,27 +2036,18 @@ class TdSuperTable:
def createConsumer(self, dbc, Consumer_nums): def createConsumer(self, dbc, Consumer_nums):
def generateConsumer(current_topic_list): def generateConsumer(current_topic_list):
conf = TaosTmqConf() consumer = Consumer({"group.id": "tg2", "td.connect.user": "root", "td.connect.pass": "taosdata"})
conf.set("group.id", "tg2") topic_list = []
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
# conf.set("enable.auto.commit", "true")
# def tmq_commit_cb_print(tmq, resp, offset, param=None):
# print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
# conf.set_auto_commit_cb(tmq_commit_cb_print, None)
consumer = conf.new_consumer()
topic_list = TaosTmqList()
for topic in current_topic_list: for topic in current_topic_list:
topic_list.append(topic) topic_list.append(topic)
try:
consumer.subscribe(topic_list) consumer.subscribe(topic_list)
except TmqError as e :
pass
# consumer with random work life # consumer with random work life
time_start = time.time() time_start = time.time()
while 1: while 1:
res = consumer.poll(1000) res = consumer.poll(1)
consumer.commit(res)
if time.time() - time_start > random.randint(5, 50): if time.time() - time_start > random.randint(5, 50):
break break
try: try:
@ -2067,14 +2076,16 @@ class TdSuperTable:
def getRegTables(self, dbc: DbConn): def getRegTables(self, dbc: DbConn):
dbName = self._dbName dbName = self._dbName
try: try:
dbc.query("select distinct TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later dbc.query("select distinct TBNAME from {}.{}".format(dbName,
self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno) errno2 = Helper.convertErrno(err.errno)
Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err)) Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
raise raise
qr = dbc.getQueryResult() qr = dbc.getQueryResult()
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation return [v[0] for v in
qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
def hasRegTables(self, dbc: DbConn): def hasRegTables(self, dbc: DbConn):
@ -2317,7 +2328,6 @@ class TdSuperTable:
]) # TODO: add more from 'top' ]) # TODO: add more from 'top'
# if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049) # if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName()) sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
if Dice.throw(3) == 0: # 1 in X chance if Dice.throw(3) == 0: # 1 in X chance
@ -2329,6 +2339,7 @@ class TdSuperTable:
return ret return ret
class TaskReadData(StateTransitionTask): class TaskReadData(StateTransitionTask):
@classmethod @classmethod
def getEndState(cls): def getEndState(cls):
@ -2368,7 +2379,6 @@ class TaskReadData(StateTransitionTask):
# by now, causing error below to be incorrectly handled due to timing issue # by now, causing error below to be incorrectly handled due to timing issue
return # TODO: fix server restart status race condtion return # TODO: fix server restart status race condtion
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
self._reconnectIfNeeded(wt) self._reconnectIfNeeded(wt)
@ -2386,6 +2396,7 @@ class TaskReadData(StateTransitionTask):
Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql())) Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
raise raise
class SqlQuery: class SqlQuery:
@classmethod @classmethod
def buildRandom(cls, db: Database): def buildRandom(cls, db: Database):
@ -2399,6 +2410,7 @@ class SqlQuery:
def getSql(self): def getSql(self):
return self._sql return self._sql
class TaskDropSuperTable(StateTransitionTask): class TaskDropSuperTable(StateTransitionTask):
@classmethod @classmethod
def getEndState(cls): def getEndState(cls):
@ -2430,7 +2442,6 @@ class TaskDropSuperTable(StateTransitionTask):
Logging.debug("[DB] Acceptable error when dropping a table") Logging.debug("[DB] Acceptable error when dropping a table")
continue # try to delete next regular table continue # try to delete next regular table
if (not tickOutput): if (not tickOutput):
tickOutput = True # Print only one time tickOutput = True # Print only one time
if isSuccess: if isSuccess:
@ -2443,8 +2454,6 @@ class TaskDropSuperTable(StateTransitionTask):
self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName)) self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
class TaskAlterTags(StateTransitionTask): class TaskAlterTags(StateTransitionTask):
@classmethod @classmethod
def getEndState(cls): def getEndState(cls):
@ -2472,6 +2481,7 @@ class TaskAlterTags(StateTransitionTask):
sTable.changeTag(dbc, "extraTag", "newTag") sTable.changeTag(dbc, "extraTag", "newTag")
# sql = "alter table db.{} change tag extraTag newTag".format(tblName) # sql = "alter table db.{} change tag extraTag newTag".format(tblName)
class TaskRestartService(StateTransitionTask): class TaskRestartService(StateTransitionTask):
_isRunning = False _isRunning = False
_classLock = threading.Lock() _classLock = threading.Lock()
@ -2487,6 +2497,7 @@ class TaskRestartService(StateTransitionTask):
return False # don't run this otherwise return False # don't run this otherwise
CHANCE_TO_RESTART_SERVICE = 200 CHANCE_TO_RESTART_SERVICE = 200
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
if not Config.getConfig().auto_start_service: # only execute when we are in -a mode if not Config.getConfig().auto_start_service: # only execute when we are in -a mode
print("_a", end="", flush=True) print("_a", end="", flush=True)
@ -2500,11 +2511,13 @@ class TaskRestartService(StateTransitionTask):
if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
dbc = wt.getDbConn() dbc = wt.getDbConn()
dbc.execute("select * from information_schema.ins_databases") # simple delay, align timing with other workers dbc.execute(
"select * from information_schema.ins_databases") # simple delay, align timing with other workers
gSvcMgr.restart() gSvcMgr.restart()
self._isRunning = False self._isRunning = False
class TaskAddData(StateTransitionTask): class TaskAddData(StateTransitionTask):
# Track which table is being actively worked on # Track which table is being actively worked on
activeTable: Set[int] = set() activeTable: Set[int] = set()
@ -2571,8 +2584,6 @@ class TaskAddData(StateTransitionTask):
# Logging.info("Data added in batch: {}".format(sql)) # Logging.info("Data added in batch: {}".format(sql))
self._unlockTableIfNeeded(fullTableName) self._unlockTableIfNeeded(fullTableName)
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
@ -2590,7 +2601,8 @@ class TaskAddData(StateTransitionTask):
# TODO: too ugly trying to lock the table reliably, refactor... # TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock self._lockTableIfNeeded(
fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
try: try:
sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {}) sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
@ -2604,7 +2616,8 @@ class TaskAddData(StateTransitionTask):
intWrote = intToWrite intWrote = intToWrite
# Quick hack, attach an update statement here. TODO: create an "update" task # Quick hack, attach an update statement here. TODO: create an "update" task
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB if (not Config.getConfig().use_shadow_db) and Dice.throw(
5) == 0: # 1 in N chance, plus not using shaddow DB
intToUpdate = db.getNextInt() # Updated but should not succeed intToUpdate = db.getNextInt() # Updated but should not succeed
nextColor = db.getNextColor() nextColor = db.getNextColor()
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
@ -2692,6 +2705,7 @@ class TaskAddData(StateTransitionTask):
self.activeTable.discard(i) # not raising an error, unlike remove self.activeTable.discard(i) # not raising an error, unlike remove
class TaskDeleteData(StateTransitionTask): class TaskDeleteData(StateTransitionTask):
# Track which table is being actively worked on # Track which table is being actively worked on
activeTable: Set[int] = set() activeTable: Set[int] = set()
@ -2756,7 +2770,8 @@ class TaskDeleteData(StateTransitionTask):
# TODO: too ugly trying to lock the table reliably, refactor... # TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock self._lockTableIfNeeded(
fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
try: try:
sql = "delete from {} where ts = '{}' ;".format( # removed: tags ('{}', {}) sql = "delete from {} where ts = '{}' ;".format( # removed: tags ('{}', {})
@ -2772,7 +2787,8 @@ class TaskDeleteData(StateTransitionTask):
intWrote = intToWrite intWrote = intToWrite
# Quick hack, attach an update statement here. TODO: create an "update" task # Quick hack, attach an update statement here. TODO: create an "update" task
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB if (not Config.getConfig().use_shadow_db) and Dice.throw(
5) == 0: # 1 in N chance, plus not using shaddow DB
intToUpdate = db.getNextInt() # Updated but should not succeed intToUpdate = db.getNextInt() # Updated but should not succeed
# nextColor = db.getNextColor() # nextColor = db.getNextColor()
sql = "delete from {} where ts = '{}' ;".format( # "INSERt" means "update" here sql = "delete from {} where ts = '{}' ;".format( # "INSERt" means "update" here
@ -2827,7 +2843,8 @@ class TaskDeleteData(StateTransitionTask):
# TODO: too ugly trying to lock the table reliably, refactor... # TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock self._lockTableIfNeeded(
fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
try: try:
sql = "delete from {} ;".format( # removed: tags ('{}', {}) sql = "delete from {} ;".format( # removed: tags ('{}', {})
@ -2837,7 +2854,8 @@ class TaskDeleteData(StateTransitionTask):
# Logging.info("Data added: {}".format(sql)) # Logging.info("Data added: {}".format(sql))
# Quick hack, attach an update statement here. TODO: create an "update" task # Quick hack, attach an update statement here. TODO: create an "update" task
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB if (not Config.getConfig().use_shadow_db) and Dice.throw(
5) == 0: # 1 in N chance, plus not using shaddow DB
sql = "delete from {} ;".format( # "INSERt" means "update" here sql = "delete from {} ;".format( # "INSERt" means "update" here
fullTableName) fullTableName)
dbc.execute(sql) dbc.execute(sql)
@ -2937,7 +2955,9 @@ class ThreadStacks: # stack info for all threads
lastSqlForThread = DbConn.fetchSqlForThread(shortTid) lastSqlForThread = DbConn.fetchSqlForThread(shortTid)
last_sql_commit_time = DbConn.get_save_sql_time(shortTid) last_sql_commit_time = DbConn.get_save_sql_time(shortTid)
# time_cost = DbConn.get_time_cost() # time_cost = DbConn.get_time_cost()
print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid, self.current_time-last_sql_commit_time ,lastSqlForThread)) print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid,
self.current_time - last_sql_commit_time,
lastSqlForThread))
stackFrame = 0 stackFrame = 0
for frame in stack: # was using: reversed(stack) for frame in stack: # was using: reversed(stack)
# print(frame) # print(frame)
@ -2949,6 +2969,7 @@ class ThreadStacks: # stack info for all threads
if self.current_time - last_sql_commit_time > 100: # dead lock occured if self.current_time - last_sql_commit_time > 100: # dead lock occured
print("maybe dead locked of thread {} ".format(shortTid)) print("maybe dead locked of thread {} ".format(shortTid))
class ClientManager: class ClientManager:
def __init__(self): def __init__(self):
Logging.info("Starting service manager") Logging.info("Starting service manager")
@ -3062,7 +3083,6 @@ class ClientManager:
svcMgr.stopTaosServices() svcMgr.stopTaosServices()
svcMgr = None svcMgr = None
# Release global variables # Release global variables
# gConfig = None # gConfig = None
Config.clearConfig() Config.clearConfig()
@ -3093,6 +3113,7 @@ class ClientManager:
# self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections # self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
self.tc.printStats() self.tc.printStats()
class MainExec: class MainExec:
def __init__(self): def __init__(self):
self._clientMgr = None self._clientMgr = None
@ -3131,7 +3152,8 @@ class MainExec:
def runService(self): def runService(self):
global gSvcMgr global gSvcMgr
gSvcMgr = self._svcMgr = ServiceManager(Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert gSvcMgr = self._svcMgr = ServiceManager(
Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert
gSvcMgr.run() # run to some end state gSvcMgr.run() # run to some end state
gSvcMgr = self._svcMgr = None gSvcMgr = self._svcMgr = None
@ -3259,7 +3281,6 @@ class MainExec:
return parser return parser
def init(self): # TODO: refactor def init(self): # TODO: refactor
global gContainer global gContainer
gContainer = Container() # micky-mouse DI gContainer = Container() # micky-mouse DI

View File

@ -12,12 +12,13 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import taos import taos
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
from taos.tmq import * from taos.tmq import *
from util.cases import *
from util.common import *
from util.log import *
from util.sql import *
from util.sqlset import *
class TDTestCase: class TDTestCase:
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
@ -67,14 +68,17 @@ class TDTestCase:
f'now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据"' f'now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据"'
] ]
self.tbnum = 1 self.tbnum = 1
def prepare_data(self): def prepare_data(self):
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})') tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
for j in self.values_list: for j in self.values_list:
tdSql.execute(f'insert into {self.stbname}_{i} values({j})') tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
def create_user(self): def create_user(self):
for user_name in ['jiacy1_all','jiacy1_read','jiacy1_write','jiacy1_none','jiacy0_all','jiacy0_read','jiacy0_write','jiacy0_none']: for user_name in ['jiacy1_all', 'jiacy1_read', 'jiacy1_write', 'jiacy1_none', 'jiacy0_all', 'jiacy0_read',
'jiacy0_write', 'jiacy0_none']:
if 'jiacy1' in user_name.lower(): if 'jiacy1' in user_name.lower():
tdSql.execute(f'create user {user_name} pass "123" sysinfo 1') tdSql.execute(f'create user {user_name} pass "123" sysinfo 1')
elif 'jiacy0' in user_name.lower(): elif 'jiacy0' in user_name.lower():
@ -101,6 +105,7 @@ class TDTestCase:
self.queryResult = None self.queryResult = None
tdLog.info(f"sql:{sql}, expect error occured") tdLog.info(f"sql:{sql}, expect error occured")
pass pass
def drop_topic(self): def drop_topic(self):
jiacy1_all_conn = taos.connect(user='jiacy1_all', password='123') jiacy1_all_conn = taos.connect(user='jiacy1_all', password='123')
jiacy1_read_conn = taos.connect(user='jiacy1_read', password='123') jiacy1_read_conn = taos.connect(user='jiacy1_read', password='123')
@ -114,7 +119,8 @@ class TDTestCase:
for user in [jiacy1_all_conn, jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]: for user in [jiacy1_all_conn, jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]:
user.execute(f'create topic db_jiacy as select * from db.stb') user.execute(f'create topic db_jiacy as select * from db.stb')
user.execute('drop topic db_jiacy') user.execute('drop topic db_jiacy')
for user in [jiacy1_write_conn,jiacy1_none_conn,jiacy0_write_conn,jiacy0_none_conn,jiacy1_all_conn,jiacy1_read_conn,jiacy0_all_conn,jiacy0_read_conn]: for user in [jiacy1_write_conn, jiacy1_none_conn, jiacy0_write_conn, jiacy0_none_conn, jiacy1_all_conn,
jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]:
sql_list = [] sql_list = []
if user in [jiacy1_all_conn, jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]: if user in [jiacy1_all_conn, jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]:
sql_list = ['drop topic root_db'] sql_list = ['drop topic root_db']
@ -134,27 +140,20 @@ class TDTestCase:
self.queryCols = 0 self.queryCols = 0
self.queryResult = None self.queryResult = None
tdLog.info(f"sql:{sql}, expect error occured") tdLog.info(f"sql:{sql}, expect error occured")
def tmq_commit_cb_print(tmq, resp, param=None): def tmq_commit_cb_print(tmq, resp, param=None):
print(f"commit: {resp}, tmq: {tmq}, param: {param}") print(f"commit: {resp}, tmq: {tmq}, param: {param}")
def subscribe_topic(self): def subscribe_topic(self):
print("create topic") print("create topic")
tdSql.execute('create topic db_topic as select * from db.stb') tdSql.execute('create topic db_topic as select * from db.stb')
tdSql.execute('grant subscribe on db_topic to jiacy1_all') tdSql.execute('grant subscribe on db_topic to jiacy1_all')
print("build consumer") print("build consumer")
conf = TaosTmqConf() tmq = Consumer({"group.id": "tg2", "td.connect.user": "jiacy1_all", "td.connect.pass": "123",
conf.set("group.id", "tg2") "enable.auto.commit": "true"})
conf.set("td.connect.user", "jiacy1_all")
conf.set("td.connect.pass", "123")
conf.set("enable.auto.commit", "true")
conf.set_auto_commit_cb(self.tmq_commit_cb_print, None)
tmq = conf.new_consumer()
print("build topic list") print("build topic list")
topic_list = TaosTmqList() tmq.subscribe(["db_topic"])
topic_list.append("db_topic")
print("basic consume loop") print("basic consume loop")
tmq.subscribe(topic_list)
sub_list = tmq.subscription()
print("subscribed topics: ", sub_list)
c = 0 c = 0
l = 0 l = 0
for i in range(10): for i in range(10):
@ -163,19 +162,22 @@ class TDTestCase:
res = tmq.poll(10) res = tmq.poll(10)
print(f"loop {l}") print(f"loop {l}")
l += 1 l += 1
if res: if not res:
print(f"received empty message at loop {l} (committed {c})")
continue
if res.error():
print(f"consumer error at loop {l} (committed {c}) {res.error()}")
continue
c += 1 c += 1
topic = res.get_topic_name() topic = res.topic()
vg = res.get_vgroup_id() db = res.database()
db = res.get_db_name() print(f"topic: {topic}\ndb: {db}")
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res: for row in res:
print(row) print(row.fetchall())
print("* committed") print("* committed")
tmq.commit(res) tmq.commit(res)
else:
print(f"received empty message at loop {l} (committed {c})")
pass
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
@ -184,9 +186,11 @@ class TDTestCase:
self.drop_topic() self.drop_topic()
self.user_privilege_check() self.user_privilege_check()
self.subscribe_topic() self.subscribe_topic()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase())