build: delete TaosConsumer and TaosTmq from taospy (#20076)
This commit is contained in:
parent
8030188f40
commit
7e8823d594
|
@ -18,7 +18,8 @@ from __future__ import annotations
|
||||||
from typing import Any, Set, Tuple
|
from typing import Any, Set, Tuple
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
from typing import List
|
from typing import List
|
||||||
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
|
from typing import \
|
||||||
|
Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
|
||||||
|
|
||||||
import textwrap
|
import textwrap
|
||||||
import time
|
import time
|
||||||
|
@ -39,7 +40,6 @@ import gc
|
||||||
import taos
|
import taos
|
||||||
from taos.tmq import *
|
from taos.tmq import *
|
||||||
|
|
||||||
|
|
||||||
from .shared.types import TdColumns, TdTags
|
from .shared.types import TdColumns, TdTags
|
||||||
|
|
||||||
# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
|
# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
|
||||||
|
@ -69,6 +69,7 @@ gSvcMgr: Optional[ServiceManager] # TODO: refactor this hack, use dep injecti
|
||||||
# logger: logging.Logger
|
# logger: logging.Logger
|
||||||
gContainer: Container
|
gContainer: Container
|
||||||
|
|
||||||
|
|
||||||
# def runThread(wt: WorkerThread):
|
# def runThread(wt: WorkerThread):
|
||||||
# wt.run()
|
# wt.run()
|
||||||
|
|
||||||
|
@ -163,7 +164,6 @@ class WorkerThread:
|
||||||
Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
|
Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
|
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
|
||||||
try:
|
try:
|
||||||
if (Config.getConfig().per_thread_db_connection): # most likely TRUE
|
if (Config.getConfig().per_thread_db_connection): # most likely TRUE
|
||||||
|
@ -172,7 +172,8 @@ class WorkerThread:
|
||||||
# self.useDb() # might encounter exceptions. TODO: catch
|
# self.useDb() # might encounter exceptions. TODO: catch
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
errno = Helper.convertErrno(err.errno)
|
errno = Helper.convertErrno(err.errno)
|
||||||
if errno in [0x383, 0x386, 0x00B, 0x014] : # invalid database, dropping, Unable to establish connection, Database not ready
|
if errno in [0x383, 0x386, 0x00B,
|
||||||
|
0x014]: # invalid database, dropping, Unable to establish connection, Database not ready
|
||||||
# ignore
|
# ignore
|
||||||
dummy = 0
|
dummy = 0
|
||||||
else:
|
else:
|
||||||
|
@ -251,6 +252,7 @@ class WorkerThread:
|
||||||
# else:
|
# else:
|
||||||
# return self._tc.getDbState().getDbConn().query(sql)
|
# return self._tc.getDbState().getDbConn().query(sql)
|
||||||
|
|
||||||
|
|
||||||
# The coordinator of all worker threads, mostly running in main thread
|
# The coordinator of all worker threads, mostly running in main thread
|
||||||
|
|
||||||
|
|
||||||
|
@ -374,7 +376,8 @@ class ThreadCoordinator:
|
||||||
# TODO: saw an error here once, let's print out stack info for err?
|
# TODO: saw an error here once, let's print out stack info for err?
|
||||||
traceback.print_stack() # Stack frame to here.
|
traceback.print_stack() # Stack frame to here.
|
||||||
Logging.info("Caused by:")
|
Logging.info("Caused by:")
|
||||||
traceback.print_exception(*sys.exc_info()) # Ref: https://www.geeksforgeeks.org/how-to-print-exception-stack-trace-in-python/
|
traceback.print_exception(
|
||||||
|
*sys.exc_info()) # Ref: https://www.geeksforgeeks.org/how-to-print-exception-stack-trace-in-python/
|
||||||
transitionFailed = True
|
transitionFailed = True
|
||||||
self._te = None # Not running any more
|
self._te = None # Not running any more
|
||||||
self._execStats.registerFailure("State transition error: {}".format(err))
|
self._execStats.registerFailure("State transition error: {}".format(err))
|
||||||
|
@ -409,7 +412,6 @@ class ThreadCoordinator:
|
||||||
# print("\n")
|
# print("\n")
|
||||||
# print(h.heap())
|
# print(h.heap())
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._syncAtBarrier() # For now just cross the barrier
|
self._syncAtBarrier() # For now just cross the barrier
|
||||||
Progress.emit(Progress.END_THREAD_STEP)
|
Progress.emit(Progress.END_THREAD_STEP)
|
||||||
|
@ -492,7 +494,6 @@ class ThreadCoordinator:
|
||||||
self._execStats = None
|
self._execStats = None
|
||||||
self._runStatus = None
|
self._runStatus = None
|
||||||
|
|
||||||
|
|
||||||
def printStats(self):
|
def printStats(self):
|
||||||
self._execStats.printStats()
|
self._execStats.printStats()
|
||||||
|
|
||||||
|
@ -564,6 +565,7 @@ class ThreadCoordinator:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
self._executedTasks.append(task)
|
self._executedTasks.append(task)
|
||||||
|
|
||||||
|
|
||||||
class ThreadPool:
|
class ThreadPool:
|
||||||
def __init__(self, numThreads, maxSteps):
|
def __init__(self, numThreads, maxSteps):
|
||||||
self.numThreads = numThreads
|
self.numThreads = numThreads
|
||||||
|
@ -587,6 +589,7 @@ class ThreadPool:
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.threadList = [] # maybe clean up each?
|
self.threadList = [] # maybe clean up each?
|
||||||
|
|
||||||
|
|
||||||
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
|
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
|
||||||
# for new table names
|
# for new table names
|
||||||
|
|
||||||
|
@ -801,7 +804,8 @@ class AnyState:
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
if isinstance(task, cls):
|
if isinstance(task, cls):
|
||||||
raise CrashGenError(
|
raise CrashGenError(
|
||||||
"This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
|
"This task: {}, is not expected to be present, given the success/failure of others".format(
|
||||||
|
cls.__name__))
|
||||||
|
|
||||||
def assertNoSuccess(self, tasks, cls):
|
def assertNoSuccess(self, tasks, cls):
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
|
@ -1016,7 +1020,6 @@ class StateMechine:
|
||||||
|
|
||||||
sTable = self._db.getFixedSuperTable()
|
sTable = self._db.getFixedSuperTable()
|
||||||
|
|
||||||
|
|
||||||
if sTable.hasRegTables(dbc): # no regular tables
|
if sTable.hasRegTables(dbc): # no regular tables
|
||||||
# print("debug=====*\n"*100)
|
# print("debug=====*\n"*100)
|
||||||
Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
|
Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
|
||||||
|
@ -1096,10 +1099,13 @@ class StateMechine:
|
||||||
weightsTypes = BasicTypes.copy()
|
weightsTypes = BasicTypes.copy()
|
||||||
|
|
||||||
# this matrixs can balance the Frequency of TaskTypes
|
# this matrixs can balance the Frequency of TaskTypes
|
||||||
balance_TaskType_matrixs = {'TaskDropDb': 5 , 'TaskDropTopics': 20 , 'TaskDropStreams':10 , 'TaskDropStreamTables':10 ,
|
balance_TaskType_matrixs = {'TaskDropDb': 5, 'TaskDropTopics': 20, 'TaskDropStreams': 10,
|
||||||
|
'TaskDropStreamTables': 10,
|
||||||
'TaskReadData': 50, 'TaskDropSuperTable': 5, 'TaskAlterTags': 3, 'TaskAddData': 10,
|
'TaskReadData': 50, 'TaskDropSuperTable': 5, 'TaskAlterTags': 3, 'TaskAddData': 10,
|
||||||
'TaskDeleteData':10 , 'TaskCreateDb':10 , 'TaskCreateStream': 3, 'TaskCreateTopic' :3,
|
'TaskDeleteData': 10, 'TaskCreateDb': 10, 'TaskCreateStream': 3,
|
||||||
'TaskCreateConsumers':10, 'TaskCreateSuperTable': 10 } # TaskType : balance_matrixs of task
|
'TaskCreateTopic': 3,
|
||||||
|
'TaskCreateConsumers': 10,
|
||||||
|
'TaskCreateSuperTable': 10} # TaskType : balance_matrixs of task
|
||||||
|
|
||||||
for task, weights in balance_TaskType_matrixs.items():
|
for task, weights in balance_TaskType_matrixs.items():
|
||||||
|
|
||||||
|
@ -1111,7 +1117,6 @@ class StateMechine:
|
||||||
task = random.sample(weightsTypes, 1)
|
task = random.sample(weightsTypes, 1)
|
||||||
return task[0]
|
return task[0]
|
||||||
|
|
||||||
|
|
||||||
# ref:
|
# ref:
|
||||||
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
|
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
|
||||||
def _weighted_choice_sub(self, weights) -> int:
|
def _weighted_choice_sub(self, weights) -> int:
|
||||||
|
@ -1123,6 +1128,7 @@ class StateMechine:
|
||||||
return i
|
return i
|
||||||
raise CrashGenError("Unexpected no choice")
|
raise CrashGenError("Unexpected no choice")
|
||||||
|
|
||||||
|
|
||||||
class Database:
|
class Database:
|
||||||
''' We use this to represent an actual TDengine database inside a service instance,
|
''' We use this to represent an actual TDengine database inside a service instance,
|
||||||
possibly in a cluster environment.
|
possibly in a cluster environment.
|
||||||
|
@ -1194,7 +1200,8 @@ class Database:
|
||||||
500 # a number representing seconds within 10 years
|
500 # a number representing seconds within 10 years
|
||||||
# print("elSec = {}".format(elSec))
|
# print("elSec = {}".format(elSec))
|
||||||
|
|
||||||
t3 = datetime.datetime(local_epoch_time[0]-10, local_epoch_time[1], local_epoch_time[2]) # default "keep" is 10 years
|
t3 = datetime.datetime(local_epoch_time[0] - 10, local_epoch_time[1],
|
||||||
|
local_epoch_time[2]) # default "keep" is 10 years
|
||||||
t4 = datetime.datetime.fromtimestamp(
|
t4 = datetime.datetime.fromtimestamp(
|
||||||
t3.timestamp() + elSec2) # see explanation above
|
t3.timestamp() + elSec2) # see explanation above
|
||||||
Logging.debug("Setting up TICKS to start from: {}".format(t4))
|
Logging.debug("Setting up TICKS to start from: {}".format(t4))
|
||||||
|
@ -1210,11 +1217,14 @@ class Database:
|
||||||
# 10k at 1/20 chance, should be enough to avoid overlaps
|
# 10k at 1/20 chance, should be enough to avoid overlaps
|
||||||
tick = cls.setupLastTick()
|
tick = cls.setupLastTick()
|
||||||
cls._lastTick = tick
|
cls._lastTick = tick
|
||||||
cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast
|
cls._lastLaggingTick = tick + datetime.timedelta(0,
|
||||||
|
-60 * 2) # lagging behind 2 minutes, should catch up fast
|
||||||
# if : # should be quite a bit into the future
|
# if : # should be quite a bit into the future
|
||||||
|
|
||||||
if Config.isSet('mix_oos_data') and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
|
if Config.isSet('mix_oos_data') and Dice.throw(
|
||||||
cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
|
20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
|
||||||
|
cls._lastLaggingTick += datetime.timedelta(0,
|
||||||
|
1) # pick the next sequence from the lagging tick sequence
|
||||||
return cls._lastLaggingTick
|
return cls._lastLaggingTick
|
||||||
else: # regular
|
else: # regular
|
||||||
# add one second to it
|
# add one second to it
|
||||||
|
@ -1334,8 +1344,6 @@ class Task():
|
||||||
self._execStats = execStats
|
self._execStats = execStats
|
||||||
self._db = db # A task is always associated/for a specific DB
|
self._db = db # A task is always associated/for a specific DB
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def isSuccess(self):
|
def isSuccess(self):
|
||||||
return self._err is None
|
return self._err is None
|
||||||
|
|
||||||
|
@ -1417,9 +1425,6 @@ class Task():
|
||||||
0x0203, # Invalid value
|
0x0203, # Invalid value
|
||||||
0x03f0, # Stream already exist , topic already exists
|
0x03f0, # Stream already exist , topic already exists
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
1000 # REST catch-all error
|
1000 # REST catch-all error
|
||||||
]:
|
]:
|
||||||
return True # These are the ALWAYS-ACCEPTABLE ones
|
return True # These are the ALWAYS-ACCEPTABLE ones
|
||||||
|
@ -1443,7 +1448,6 @@ class Task():
|
||||||
|
|
||||||
return False # Not an acceptable error
|
return False # Not an acceptable error
|
||||||
|
|
||||||
|
|
||||||
def execute(self, wt: WorkerThread):
|
def execute(self, wt: WorkerThread):
|
||||||
wt.verifyThreadSelf()
|
wt.verifyThreadSelf()
|
||||||
self._workerThread = wt # type: ignore
|
self._workerThread = wt # type: ignore
|
||||||
|
@ -1485,7 +1489,8 @@ class Task():
|
||||||
# raise # so that we see full stack
|
# raise # so that we see full stack
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
print(
|
print(
|
||||||
"\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
|
"\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(
|
||||||
|
errMsg) +
|
||||||
"----------------------------\n")
|
"----------------------------\n")
|
||||||
# sys.exit(-1)
|
# sys.exit(-1)
|
||||||
self._err = err
|
self._err = err
|
||||||
|
@ -1718,10 +1723,15 @@ class TaskCreateDb(StateTransitionTask):
|
||||||
cache_model = Dice.choice(['none', 'last_row', 'last_value', 'both'])
|
cache_model = Dice.choice(['none', 'last_row', 'last_value', 'both'])
|
||||||
buffer = random.randint(3, 128)
|
buffer = random.randint(3, 128)
|
||||||
dbName = self._db.getName()
|
dbName = self._db.getName()
|
||||||
self.execWtSql(wt, "create database {} {} {} vgroups {} cachemodel '{}' buffer {} ".format(dbName, repStr, updatePostfix, vg_nums, cache_model,buffer ) )
|
self.execWtSql(wt, "create database {} {} {} vgroups {} cachemodel '{}' buffer {} ".format(dbName, repStr,
|
||||||
|
updatePostfix,
|
||||||
|
vg_nums,
|
||||||
|
cache_model,
|
||||||
|
buffer))
|
||||||
if dbName == "db_0" and Config.getConfig().use_shadow_db:
|
if dbName == "db_0" and Config.getConfig().use_shadow_db:
|
||||||
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix))
|
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix))
|
||||||
|
|
||||||
|
|
||||||
class TaskDropDb(StateTransitionTask):
|
class TaskDropDb(StateTransitionTask):
|
||||||
@classmethod
|
@classmethod
|
||||||
def getEndState(cls):
|
def getEndState(cls):
|
||||||
|
@ -1734,7 +1744,8 @@ class TaskDropDb(StateTransitionTask):
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.queryWtSql(wt, "drop database {}".format(self._db.getName())) # drop database maybe failed ,because topic exists
|
self.queryWtSql(wt, "drop database {}".format(
|
||||||
|
self._db.getName())) # drop database maybe failed ,because topic exists
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
errno = Helper.convertErrno(err.errno)
|
errno = Helper.convertErrno(err.errno)
|
||||||
if errno in [0x0203]: # drop maybe failed
|
if errno in [0x0203]: # drop maybe failed
|
||||||
|
@ -1769,7 +1780,8 @@ class TaskCreateStream(StateTransitionTask):
|
||||||
stbname = sTable.getName()
|
stbname = sTable.getName()
|
||||||
sub_tables = sTable.getRegTables(wt.getDbConn())
|
sub_tables = sTable.getRegTables(wt.getDbConn())
|
||||||
aggExpr = Dice.choice([
|
aggExpr = Dice.choice([
|
||||||
'count(*)', 'avg(speed)', 'sum(speed)', 'stddev(speed)','min(speed)', 'max(speed)', 'first(speed)', 'last(speed)',
|
'count(*)', 'avg(speed)', 'sum(speed)', 'stddev(speed)', 'min(speed)', 'max(speed)', 'first(speed)',
|
||||||
|
'last(speed)',
|
||||||
'apercentile(speed, 10)', 'last_row(*)', 'twa(speed)'])
|
'apercentile(speed, 10)', 'last_row(*)', 'twa(speed)'])
|
||||||
|
|
||||||
stream_sql = '' # set default value
|
stream_sql = '' # set default value
|
||||||
|
@ -1814,19 +1826,25 @@ class TaskCreateTopic(StateTransitionTask):
|
||||||
stbname = sTable.getName()
|
stbname = sTable.getName()
|
||||||
sub_tables = sTable.getRegTables(wt.getDbConn())
|
sub_tables = sTable.getRegTables(wt.getDbConn())
|
||||||
|
|
||||||
scalarExpr = Dice.choice([ '*','speed','color','abs(speed)','acos(speed)','asin(speed)','atan(speed)','ceil(speed)','cos(speed)','cos(speed)',
|
scalarExpr = Dice.choice(
|
||||||
'floor(speed)','log(speed,2)','pow(speed,2)','round(speed)','sin(speed)','sqrt(speed)','char_length(color)','concat(color,color)',
|
['*', 'speed', 'color', 'abs(speed)', 'acos(speed)', 'asin(speed)', 'atan(speed)', 'ceil(speed)',
|
||||||
'concat_ws(" ", color,color," ")','length(color)', 'lower(color)', 'ltrim(color)','substr(color , 2)','upper(color)','cast(speed as double)',
|
'cos(speed)', 'cos(speed)',
|
||||||
|
'floor(speed)', 'log(speed,2)', 'pow(speed,2)', 'round(speed)', 'sin(speed)', 'sqrt(speed)',
|
||||||
|
'char_length(color)', 'concat(color,color)',
|
||||||
|
'concat_ws(" ", color,color," ")', 'length(color)', 'lower(color)', 'ltrim(color)', 'substr(color , 2)',
|
||||||
|
'upper(color)', 'cast(speed as double)',
|
||||||
'cast(ts as bigint)'])
|
'cast(ts as bigint)'])
|
||||||
topic_sql = '' # set default value
|
topic_sql = '' # set default value
|
||||||
if Dice.throw(3) == 0: # create topic : source data from sub query
|
if Dice.throw(3) == 0: # create topic : source data from sub query
|
||||||
if sub_tables: # if not empty
|
if sub_tables: # if not empty
|
||||||
sub_tbname = sub_tables[0]
|
sub_tbname = sub_tables[0]
|
||||||
# create topic : source data from sub query of sub stable
|
# create topic : source data from sub query of sub stable
|
||||||
topic_sql = 'create topic {} as select {} FROM {}.{} ; '.format(sub_topic_name,scalarExpr,dbname,sub_tbname)
|
topic_sql = 'create topic {} as select {} FROM {}.{} ; '.format(sub_topic_name, scalarExpr, dbname,
|
||||||
|
sub_tbname)
|
||||||
|
|
||||||
else: # create topic : source data from sub query of stable
|
else: # create topic : source data from sub query of stable
|
||||||
topic_sql = 'create topic {} as select {} FROM {}.{} '.format(super_topic_name,scalarExpr, dbname,stbname)
|
topic_sql = 'create topic {} as select {} FROM {}.{} '.format(super_topic_name, scalarExpr, dbname,
|
||||||
|
stbname)
|
||||||
elif Dice.throw(3) == 1: # create topic : source data from super table
|
elif Dice.throw(3) == 1: # create topic : source data from super table
|
||||||
topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic, dbname, stbname)
|
topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic, dbname, stbname)
|
||||||
|
|
||||||
|
@ -1840,6 +1858,7 @@ class TaskCreateTopic(StateTransitionTask):
|
||||||
self.execWtSql(wt, topic_sql)
|
self.execWtSql(wt, topic_sql)
|
||||||
Logging.debug("[OPS] db topic is creating at {}".format(time.time()))
|
Logging.debug("[OPS] db topic is creating at {}".format(time.time()))
|
||||||
|
|
||||||
|
|
||||||
class TaskDropTopics(StateTransitionTask):
|
class TaskDropTopics(StateTransitionTask):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -1853,7 +1872,6 @@ class TaskDropTopics(StateTransitionTask):
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
dbname = self._db.getName()
|
dbname = self._db.getName()
|
||||||
|
|
||||||
|
|
||||||
if not self._db.exists(wt.getDbConn()):
|
if not self._db.exists(wt.getDbConn()):
|
||||||
Logging.debug("Skipping task, no DB yet")
|
Logging.debug("Skipping task, no DB yet")
|
||||||
return
|
return
|
||||||
|
@ -1865,6 +1883,7 @@ class TaskDropTopics(StateTransitionTask):
|
||||||
sTable.dropTopics(wt.getDbConn(), dbname, None) # drop topics of database
|
sTable.dropTopics(wt.getDbConn(), dbname, None) # drop topics of database
|
||||||
sTable.dropTopics(wt.getDbConn(), dbname, tblName) # drop topics of stable
|
sTable.dropTopics(wt.getDbConn(), dbname, tblName) # drop topics of stable
|
||||||
|
|
||||||
|
|
||||||
class TaskDropStreams(StateTransitionTask):
|
class TaskDropStreams(StateTransitionTask):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -1878,7 +1897,6 @@ class TaskDropStreams(StateTransitionTask):
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
# dbname = self._db.getName()
|
# dbname = self._db.getName()
|
||||||
|
|
||||||
|
|
||||||
if not self._db.exists(wt.getDbConn()):
|
if not self._db.exists(wt.getDbConn()):
|
||||||
Logging.debug("Skipping task, no DB yet")
|
Logging.debug("Skipping task, no DB yet")
|
||||||
return
|
return
|
||||||
|
@ -1889,6 +1907,7 @@ class TaskDropStreams(StateTransitionTask):
|
||||||
if sTable.hasStreams(wt.getDbConn()):
|
if sTable.hasStreams(wt.getDbConn()):
|
||||||
sTable.dropStreams(wt.getDbConn()) # drop stream of database
|
sTable.dropStreams(wt.getDbConn()) # drop stream of database
|
||||||
|
|
||||||
|
|
||||||
class TaskDropStreamTables(StateTransitionTask):
|
class TaskDropStreamTables(StateTransitionTask):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -1902,7 +1921,6 @@ class TaskDropStreamTables(StateTransitionTask):
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
# dbname = self._db.getName()
|
# dbname = self._db.getName()
|
||||||
|
|
||||||
|
|
||||||
if not self._db.exists(wt.getDbConn()):
|
if not self._db.exists(wt.getDbConn()):
|
||||||
Logging.debug("Skipping task, no DB yet")
|
Logging.debug("Skipping task, no DB yet")
|
||||||
return
|
return
|
||||||
|
@ -1913,6 +1931,7 @@ class TaskDropStreamTables(StateTransitionTask):
|
||||||
if sTable.hasStreamTables(wt.getDbConn()):
|
if sTable.hasStreamTables(wt.getDbConn()):
|
||||||
sTable.dropStreamTables(wt.getDbConn()) # drop stream tables
|
sTable.dropStreamTables(wt.getDbConn()) # drop stream tables
|
||||||
|
|
||||||
|
|
||||||
class TaskCreateConsumers(StateTransitionTask):
|
class TaskCreateConsumers(StateTransitionTask):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -1974,7 +1993,6 @@ class TdSuperTable:
|
||||||
def getName(self):
|
def getName(self):
|
||||||
return self._stName
|
return self._stName
|
||||||
|
|
||||||
|
|
||||||
def drop(self, dbc, skipCheck=False):
|
def drop(self, dbc, skipCheck=False):
|
||||||
dbName = self._dbName
|
dbName = self._dbName
|
||||||
if self.exists(dbc): # if myself exists
|
if self.exists(dbc): # if myself exists
|
||||||
|
@ -2018,27 +2036,18 @@ class TdSuperTable:
|
||||||
def createConsumer(self, dbc, Consumer_nums):
|
def createConsumer(self, dbc, Consumer_nums):
|
||||||
|
|
||||||
def generateConsumer(current_topic_list):
|
def generateConsumer(current_topic_list):
|
||||||
conf = TaosTmqConf()
|
consumer = Consumer({"group.id": "tg2", "td.connect.user": "root", "td.connect.pass": "taosdata"})
|
||||||
conf.set("group.id", "tg2")
|
topic_list = []
|
||||||
conf.set("td.connect.user", "root")
|
|
||||||
conf.set("td.connect.pass", "taosdata")
|
|
||||||
# conf.set("enable.auto.commit", "true")
|
|
||||||
# def tmq_commit_cb_print(tmq, resp, offset, param=None):
|
|
||||||
# print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
|
|
||||||
# conf.set_auto_commit_cb(tmq_commit_cb_print, None)
|
|
||||||
consumer = conf.new_consumer()
|
|
||||||
topic_list = TaosTmqList()
|
|
||||||
for topic in current_topic_list:
|
for topic in current_topic_list:
|
||||||
topic_list.append(topic)
|
topic_list.append(topic)
|
||||||
try:
|
|
||||||
consumer.subscribe(topic_list)
|
consumer.subscribe(topic_list)
|
||||||
except TmqError as e :
|
|
||||||
pass
|
|
||||||
|
|
||||||
# consumer with random work life
|
# consumer with random work life
|
||||||
time_start = time.time()
|
time_start = time.time()
|
||||||
while 1:
|
while 1:
|
||||||
res = consumer.poll(1000)
|
res = consumer.poll(1)
|
||||||
|
consumer.commit(res)
|
||||||
if time.time() - time_start > random.randint(5, 50):
|
if time.time() - time_start > random.randint(5, 50):
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
|
@ -2067,14 +2076,16 @@ class TdSuperTable:
|
||||||
def getRegTables(self, dbc: DbConn):
|
def getRegTables(self, dbc: DbConn):
|
||||||
dbName = self._dbName
|
dbName = self._dbName
|
||||||
try:
|
try:
|
||||||
dbc.query("select distinct TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later
|
dbc.query("select distinct TBNAME from {}.{}".format(dbName,
|
||||||
|
self._stName)) # TODO: analyze result set later
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
errno2 = Helper.convertErrno(err.errno)
|
errno2 = Helper.convertErrno(err.errno)
|
||||||
Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
|
Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
qr = dbc.getQueryResult()
|
qr = dbc.getQueryResult()
|
||||||
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
|
return [v[0] for v in
|
||||||
|
qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
|
||||||
|
|
||||||
def hasRegTables(self, dbc: DbConn):
|
def hasRegTables(self, dbc: DbConn):
|
||||||
|
|
||||||
|
@ -2317,7 +2328,6 @@ class TdSuperTable:
|
||||||
|
|
||||||
]) # TODO: add more from 'top'
|
]) # TODO: add more from 'top'
|
||||||
|
|
||||||
|
|
||||||
# if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
|
# if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
|
||||||
sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
|
sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
|
||||||
if Dice.throw(3) == 0: # 1 in X chance
|
if Dice.throw(3) == 0: # 1 in X chance
|
||||||
|
@ -2329,6 +2339,7 @@ class TdSuperTable:
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
class TaskReadData(StateTransitionTask):
|
class TaskReadData(StateTransitionTask):
|
||||||
@classmethod
|
@classmethod
|
||||||
def getEndState(cls):
|
def getEndState(cls):
|
||||||
|
@ -2368,7 +2379,6 @@ class TaskReadData(StateTransitionTask):
|
||||||
# by now, causing error below to be incorrectly handled due to timing issue
|
# by now, causing error below to be incorrectly handled due to timing issue
|
||||||
return # TODO: fix server restart status race condtion
|
return # TODO: fix server restart status race condtion
|
||||||
|
|
||||||
|
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
self._reconnectIfNeeded(wt)
|
self._reconnectIfNeeded(wt)
|
||||||
|
|
||||||
|
@ -2386,6 +2396,7 @@ class TaskReadData(StateTransitionTask):
|
||||||
Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
|
Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
class SqlQuery:
|
class SqlQuery:
|
||||||
@classmethod
|
@classmethod
|
||||||
def buildRandom(cls, db: Database):
|
def buildRandom(cls, db: Database):
|
||||||
|
@ -2399,6 +2410,7 @@ class SqlQuery:
|
||||||
def getSql(self):
|
def getSql(self):
|
||||||
return self._sql
|
return self._sql
|
||||||
|
|
||||||
|
|
||||||
class TaskDropSuperTable(StateTransitionTask):
|
class TaskDropSuperTable(StateTransitionTask):
|
||||||
@classmethod
|
@classmethod
|
||||||
def getEndState(cls):
|
def getEndState(cls):
|
||||||
|
@ -2430,7 +2442,6 @@ class TaskDropSuperTable(StateTransitionTask):
|
||||||
Logging.debug("[DB] Acceptable error when dropping a table")
|
Logging.debug("[DB] Acceptable error when dropping a table")
|
||||||
continue # try to delete next regular table
|
continue # try to delete next regular table
|
||||||
|
|
||||||
|
|
||||||
if (not tickOutput):
|
if (not tickOutput):
|
||||||
tickOutput = True # Print only one time
|
tickOutput = True # Print only one time
|
||||||
if isSuccess:
|
if isSuccess:
|
||||||
|
@ -2443,8 +2454,6 @@ class TaskDropSuperTable(StateTransitionTask):
|
||||||
self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
|
self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class TaskAlterTags(StateTransitionTask):
|
class TaskAlterTags(StateTransitionTask):
|
||||||
@classmethod
|
@classmethod
|
||||||
def getEndState(cls):
|
def getEndState(cls):
|
||||||
|
@ -2472,6 +2481,7 @@ class TaskAlterTags(StateTransitionTask):
|
||||||
sTable.changeTag(dbc, "extraTag", "newTag")
|
sTable.changeTag(dbc, "extraTag", "newTag")
|
||||||
# sql = "alter table db.{} change tag extraTag newTag".format(tblName)
|
# sql = "alter table db.{} change tag extraTag newTag".format(tblName)
|
||||||
|
|
||||||
|
|
||||||
class TaskRestartService(StateTransitionTask):
|
class TaskRestartService(StateTransitionTask):
|
||||||
_isRunning = False
|
_isRunning = False
|
||||||
_classLock = threading.Lock()
|
_classLock = threading.Lock()
|
||||||
|
@ -2487,6 +2497,7 @@ class TaskRestartService(StateTransitionTask):
|
||||||
return False # don't run this otherwise
|
return False # don't run this otherwise
|
||||||
|
|
||||||
CHANCE_TO_RESTART_SERVICE = 200
|
CHANCE_TO_RESTART_SERVICE = 200
|
||||||
|
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
if not Config.getConfig().auto_start_service: # only execute when we are in -a mode
|
if not Config.getConfig().auto_start_service: # only execute when we are in -a mode
|
||||||
print("_a", end="", flush=True)
|
print("_a", end="", flush=True)
|
||||||
|
@ -2500,11 +2511,13 @@ class TaskRestartService(StateTransitionTask):
|
||||||
|
|
||||||
if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
|
if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
|
||||||
dbc = wt.getDbConn()
|
dbc = wt.getDbConn()
|
||||||
dbc.execute("select * from information_schema.ins_databases") # simple delay, align timing with other workers
|
dbc.execute(
|
||||||
|
"select * from information_schema.ins_databases") # simple delay, align timing with other workers
|
||||||
gSvcMgr.restart()
|
gSvcMgr.restart()
|
||||||
|
|
||||||
self._isRunning = False
|
self._isRunning = False
|
||||||
|
|
||||||
|
|
||||||
class TaskAddData(StateTransitionTask):
|
class TaskAddData(StateTransitionTask):
|
||||||
# Track which table is being actively worked on
|
# Track which table is being actively worked on
|
||||||
activeTable: Set[int] = set()
|
activeTable: Set[int] = set()
|
||||||
|
@ -2571,8 +2584,6 @@ class TaskAddData(StateTransitionTask):
|
||||||
# Logging.info("Data added in batch: {}".format(sql))
|
# Logging.info("Data added in batch: {}".format(sql))
|
||||||
self._unlockTableIfNeeded(fullTableName)
|
self._unlockTableIfNeeded(fullTableName)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
|
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
|
||||||
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
|
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
|
||||||
|
|
||||||
|
@ -2590,7 +2601,8 @@ class TaskAddData(StateTransitionTask):
|
||||||
|
|
||||||
# TODO: too ugly trying to lock the table reliably, refactor...
|
# TODO: too ugly trying to lock the table reliably, refactor...
|
||||||
fullTableName = db.getName() + '.' + regTableName
|
fullTableName = db.getName() + '.' + regTableName
|
||||||
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
|
self._lockTableIfNeeded(
|
||||||
|
fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
|
sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
|
||||||
|
@ -2604,7 +2616,8 @@ class TaskAddData(StateTransitionTask):
|
||||||
intWrote = intToWrite
|
intWrote = intToWrite
|
||||||
|
|
||||||
# Quick hack, attach an update statement here. TODO: create an "update" task
|
# Quick hack, attach an update statement here. TODO: create an "update" task
|
||||||
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
|
if (not Config.getConfig().use_shadow_db) and Dice.throw(
|
||||||
|
5) == 0: # 1 in N chance, plus not using shaddow DB
|
||||||
intToUpdate = db.getNextInt() # Updated, but should not succeed
|
intToUpdate = db.getNextInt() # Updated, but should not succeed
|
||||||
nextColor = db.getNextColor()
|
nextColor = db.getNextColor()
|
||||||
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
|
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
|
||||||
|
@ -2692,6 +2705,7 @@ class TaskAddData(StateTransitionTask):
|
||||||
|
|
||||||
self.activeTable.discard(i) # not raising an error, unlike remove
|
self.activeTable.discard(i) # not raising an error, unlike remove
|
||||||
|
|
||||||
|
|
||||||
class TaskDeleteData(StateTransitionTask):
|
class TaskDeleteData(StateTransitionTask):
|
||||||
# Track which table is being actively worked on
|
# Track which table is being actively worked on
|
||||||
activeTable: Set[int] = set()
|
activeTable: Set[int] = set()
|
||||||
|
@ -2756,7 +2770,8 @@ class TaskDeleteData(StateTransitionTask):
|
||||||
|
|
||||||
# TODO: too ugly trying to lock the table reliably, refactor...
|
# TODO: too ugly trying to lock the table reliably, refactor...
|
||||||
fullTableName = db.getName() + '.' + regTableName
|
fullTableName = db.getName() + '.' + regTableName
|
||||||
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
|
self._lockTableIfNeeded(
|
||||||
|
fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sql = "delete from {} where ts = '{}' ;".format( # removed: tags ('{}', {})
|
sql = "delete from {} where ts = '{}' ;".format( # removed: tags ('{}', {})
|
||||||
|
@ -2772,7 +2787,8 @@ class TaskDeleteData(StateTransitionTask):
|
||||||
intWrote = intToWrite
|
intWrote = intToWrite
|
||||||
|
|
||||||
# Quick hack, attach an update statement here. TODO: create an "update" task
|
# Quick hack, attach an update statement here. TODO: create an "update" task
|
||||||
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
|
if (not Config.getConfig().use_shadow_db) and Dice.throw(
|
||||||
|
5) == 0: # 1 in N chance, plus not using shaddow DB
|
||||||
intToUpdate = db.getNextInt() # Updated, but should not succeed
|
intToUpdate = db.getNextInt() # Updated, but should not succeed
|
||||||
# nextColor = db.getNextColor()
|
# nextColor = db.getNextColor()
|
||||||
sql = "delete from {} where ts = '{}' ;".format( # "INSERt" means "update" here
|
sql = "delete from {} where ts = '{}' ;".format( # "INSERt" means "update" here
|
||||||
|
@ -2827,7 +2843,8 @@ class TaskDeleteData(StateTransitionTask):
|
||||||
|
|
||||||
# TODO: too ugly trying to lock the table reliably, refactor...
|
# TODO: too ugly trying to lock the table reliably, refactor...
|
||||||
fullTableName = db.getName() + '.' + regTableName
|
fullTableName = db.getName() + '.' + regTableName
|
||||||
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
|
self._lockTableIfNeeded(
|
||||||
|
fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sql = "delete from {} ;".format( # removed: tags ('{}', {})
|
sql = "delete from {} ;".format( # removed: tags ('{}', {})
|
||||||
|
@ -2837,7 +2854,8 @@ class TaskDeleteData(StateTransitionTask):
|
||||||
# Logging.info("Data added: {}".format(sql))
|
# Logging.info("Data added: {}".format(sql))
|
||||||
|
|
||||||
# Quick hack, attach an update statement here. TODO: create an "update" task
|
# Quick hack, attach an update statement here. TODO: create an "update" task
|
||||||
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
|
if (not Config.getConfig().use_shadow_db) and Dice.throw(
|
||||||
|
5) == 0: # 1 in N chance, plus not using shaddow DB
|
||||||
sql = "delete from {} ;".format( # "INSERt" means "update" here
|
sql = "delete from {} ;".format( # "INSERt" means "update" here
|
||||||
fullTableName)
|
fullTableName)
|
||||||
dbc.execute(sql)
|
dbc.execute(sql)
|
||||||
|
@ -2937,7 +2955,9 @@ class ThreadStacks: # stack info for all threads
|
||||||
lastSqlForThread = DbConn.fetchSqlForThread(shortTid)
|
lastSqlForThread = DbConn.fetchSqlForThread(shortTid)
|
||||||
last_sql_commit_time = DbConn.get_save_sql_time(shortTid)
|
last_sql_commit_time = DbConn.get_save_sql_time(shortTid)
|
||||||
# time_cost = DbConn.get_time_cost()
|
# time_cost = DbConn.get_time_cost()
|
||||||
print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid, self.current_time-last_sql_commit_time ,lastSqlForThread))
|
print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid,
|
||||||
|
self.current_time - last_sql_commit_time,
|
||||||
|
lastSqlForThread))
|
||||||
stackFrame = 0
|
stackFrame = 0
|
||||||
for frame in stack: # was using: reversed(stack)
|
for frame in stack: # was using: reversed(stack)
|
||||||
# print(frame)
|
# print(frame)
|
||||||
|
@ -2949,6 +2969,7 @@ class ThreadStacks: # stack info for all threads
|
||||||
if self.current_time - last_sql_commit_time > 100: # dead lock occured
|
if self.current_time - last_sql_commit_time > 100: # dead lock occured
|
||||||
print("maybe dead locked of thread {} ".format(shortTid))
|
print("maybe dead locked of thread {} ".format(shortTid))
|
||||||
|
|
||||||
|
|
||||||
class ClientManager:
|
class ClientManager:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
Logging.info("Starting service manager")
|
Logging.info("Starting service manager")
|
||||||
|
@ -3062,7 +3083,6 @@ class ClientManager:
|
||||||
svcMgr.stopTaosServices()
|
svcMgr.stopTaosServices()
|
||||||
svcMgr = None
|
svcMgr = None
|
||||||
|
|
||||||
|
|
||||||
# Release global variables
|
# Release global variables
|
||||||
# gConfig = None
|
# gConfig = None
|
||||||
Config.clearConfig()
|
Config.clearConfig()
|
||||||
|
@ -3093,6 +3113,7 @@ class ClientManager:
|
||||||
# self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
|
# self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
|
||||||
self.tc.printStats()
|
self.tc.printStats()
|
||||||
|
|
||||||
|
|
||||||
class MainExec:
|
class MainExec:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._clientMgr = None
|
self._clientMgr = None
|
||||||
|
@ -3131,7 +3152,8 @@ class MainExec:
|
||||||
|
|
||||||
def runService(self):
|
def runService(self):
|
||||||
global gSvcMgr
|
global gSvcMgr
|
||||||
gSvcMgr = self._svcMgr = ServiceManager(Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert
|
gSvcMgr = self._svcMgr = ServiceManager(
|
||||||
|
Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert
|
||||||
|
|
||||||
gSvcMgr.run() # run to some end state
|
gSvcMgr.run() # run to some end state
|
||||||
gSvcMgr = self._svcMgr = None
|
gSvcMgr = self._svcMgr = None
|
||||||
|
@ -3259,7 +3281,6 @@ class MainExec:
|
||||||
|
|
||||||
return parser
|
return parser
|
||||||
|
|
||||||
|
|
||||||
def init(self): # TODO: refactor
|
def init(self): # TODO: refactor
|
||||||
global gContainer
|
global gContainer
|
||||||
gContainer = Container() # micky-mouse DI
|
gContainer = Container() # micky-mouse DI
|
||||||
|
|
|
@ -12,12 +12,13 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import taos
|
import taos
|
||||||
from util.log import *
|
|
||||||
from util.cases import *
|
|
||||||
from util.sql import *
|
|
||||||
from util.common import *
|
|
||||||
from util.sqlset import *
|
|
||||||
from taos.tmq import *
|
from taos.tmq import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.common import *
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.sqlset import *
|
||||||
|
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
@ -67,14 +68,17 @@ class TDTestCase:
|
||||||
f'now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据"'
|
f'now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据"'
|
||||||
]
|
]
|
||||||
self.tbnum = 1
|
self.tbnum = 1
|
||||||
|
|
||||||
def prepare_data(self):
|
def prepare_data(self):
|
||||||
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
|
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
|
||||||
for i in range(self.tbnum):
|
for i in range(self.tbnum):
|
||||||
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
|
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
|
||||||
for j in self.values_list:
|
for j in self.values_list:
|
||||||
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
|
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
|
||||||
|
|
||||||
def create_user(self):
|
def create_user(self):
|
||||||
for user_name in ['jiacy1_all','jiacy1_read','jiacy1_write','jiacy1_none','jiacy0_all','jiacy0_read','jiacy0_write','jiacy0_none']:
|
for user_name in ['jiacy1_all', 'jiacy1_read', 'jiacy1_write', 'jiacy1_none', 'jiacy0_all', 'jiacy0_read',
|
||||||
|
'jiacy0_write', 'jiacy0_none']:
|
||||||
if 'jiacy1' in user_name.lower():
|
if 'jiacy1' in user_name.lower():
|
||||||
tdSql.execute(f'create user {user_name} pass "123" sysinfo 1')
|
tdSql.execute(f'create user {user_name} pass "123" sysinfo 1')
|
||||||
elif 'jiacy0' in user_name.lower():
|
elif 'jiacy0' in user_name.lower():
|
||||||
|
@ -101,6 +105,7 @@ class TDTestCase:
|
||||||
self.queryResult = None
|
self.queryResult = None
|
||||||
tdLog.info(f"sql:{sql}, expect error occured")
|
tdLog.info(f"sql:{sql}, expect error occured")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def drop_topic(self):
|
def drop_topic(self):
|
||||||
jiacy1_all_conn = taos.connect(user='jiacy1_all', password='123')
|
jiacy1_all_conn = taos.connect(user='jiacy1_all', password='123')
|
||||||
jiacy1_read_conn = taos.connect(user='jiacy1_read', password='123')
|
jiacy1_read_conn = taos.connect(user='jiacy1_read', password='123')
|
||||||
|
@ -114,7 +119,8 @@ class TDTestCase:
|
||||||
for user in [jiacy1_all_conn, jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]:
|
for user in [jiacy1_all_conn, jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]:
|
||||||
user.execute(f'create topic db_jiacy as select * from db.stb')
|
user.execute(f'create topic db_jiacy as select * from db.stb')
|
||||||
user.execute('drop topic db_jiacy')
|
user.execute('drop topic db_jiacy')
|
||||||
for user in [jiacy1_write_conn,jiacy1_none_conn,jiacy0_write_conn,jiacy0_none_conn,jiacy1_all_conn,jiacy1_read_conn,jiacy0_all_conn,jiacy0_read_conn]:
|
for user in [jiacy1_write_conn, jiacy1_none_conn, jiacy0_write_conn, jiacy0_none_conn, jiacy1_all_conn,
|
||||||
|
jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]:
|
||||||
sql_list = []
|
sql_list = []
|
||||||
if user in [jiacy1_all_conn, jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]:
|
if user in [jiacy1_all_conn, jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]:
|
||||||
sql_list = ['drop topic root_db']
|
sql_list = ['drop topic root_db']
|
||||||
|
@ -134,27 +140,20 @@ class TDTestCase:
|
||||||
self.queryCols = 0
|
self.queryCols = 0
|
||||||
self.queryResult = None
|
self.queryResult = None
|
||||||
tdLog.info(f"sql:{sql}, expect error occured")
|
tdLog.info(f"sql:{sql}, expect error occured")
|
||||||
|
|
||||||
def tmq_commit_cb_print(tmq, resp, param=None):
|
def tmq_commit_cb_print(tmq, resp, param=None):
|
||||||
print(f"commit: {resp}, tmq: {tmq}, param: {param}")
|
print(f"commit: {resp}, tmq: {tmq}, param: {param}")
|
||||||
|
|
||||||
def subscribe_topic(self):
|
def subscribe_topic(self):
|
||||||
print("create topic")
|
print("create topic")
|
||||||
tdSql.execute('create topic db_topic as select * from db.stb')
|
tdSql.execute('create topic db_topic as select * from db.stb')
|
||||||
tdSql.execute('grant subscribe on db_topic to jiacy1_all')
|
tdSql.execute('grant subscribe on db_topic to jiacy1_all')
|
||||||
print("build consumer")
|
print("build consumer")
|
||||||
conf = TaosTmqConf()
|
tmq = Consumer({"group.id": "tg2", "td.connect.user": "jiacy1_all", "td.connect.pass": "123",
|
||||||
conf.set("group.id", "tg2")
|
"enable.auto.commit": "true"})
|
||||||
conf.set("td.connect.user", "jiacy1_all")
|
|
||||||
conf.set("td.connect.pass", "123")
|
|
||||||
conf.set("enable.auto.commit", "true")
|
|
||||||
conf.set_auto_commit_cb(self.tmq_commit_cb_print, None)
|
|
||||||
tmq = conf.new_consumer()
|
|
||||||
print("build topic list")
|
print("build topic list")
|
||||||
topic_list = TaosTmqList()
|
tmq.subscribe(["db_topic"])
|
||||||
topic_list.append("db_topic")
|
|
||||||
print("basic consume loop")
|
print("basic consume loop")
|
||||||
tmq.subscribe(topic_list)
|
|
||||||
sub_list = tmq.subscription()
|
|
||||||
print("subscribed topics: ", sub_list)
|
|
||||||
c = 0
|
c = 0
|
||||||
l = 0
|
l = 0
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
|
@ -163,19 +162,22 @@ class TDTestCase:
|
||||||
res = tmq.poll(10)
|
res = tmq.poll(10)
|
||||||
print(f"loop {l}")
|
print(f"loop {l}")
|
||||||
l += 1
|
l += 1
|
||||||
if res:
|
if not res:
|
||||||
|
print(f"received empty message at loop {l} (committed {c})")
|
||||||
|
continue
|
||||||
|
if res.error():
|
||||||
|
print(f"consumer error at loop {l} (committed {c}) {res.error()}")
|
||||||
|
continue
|
||||||
|
|
||||||
c += 1
|
c += 1
|
||||||
topic = res.get_topic_name()
|
topic = res.topic()
|
||||||
vg = res.get_vgroup_id()
|
db = res.database()
|
||||||
db = res.get_db_name()
|
print(f"topic: {topic}\ndb: {db}")
|
||||||
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
|
|
||||||
for row in res:
|
for row in res:
|
||||||
print(row)
|
print(row.fetchall())
|
||||||
print("* committed")
|
print("* committed")
|
||||||
tmq.commit(res)
|
tmq.commit(res)
|
||||||
else:
|
|
||||||
print(f"received empty message at loop {l} (committed {c})")
|
|
||||||
pass
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
|
@ -184,9 +186,11 @@ class TDTestCase:
|
||||||
self.drop_topic()
|
self.drop_topic()
|
||||||
self.user_privilege_check()
|
self.user_privilege_check()
|
||||||
self.subscribe_topic()
|
self.subscribe_topic()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success("%s successfully executed" % __file__)
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
tdCases.addWindows(__file__, TDTestCase())
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
tdCases.addLinux(__file__, TDTestCase())
|
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue