update
This commit is contained in:
parent
66bf9b30b9
commit
17a502f569
|
@ -45,7 +45,7 @@ fi
|
||||||
|
|
||||||
# Now getting ready to execute Python
|
# Now getting ready to execute Python
|
||||||
# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk
|
# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk
|
||||||
PYTHON_EXEC=python3.8
|
PYTHON_EXEC=python3
|
||||||
|
|
||||||
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
|
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
|
||||||
# export PYTHONPATH=$(pwd)/../../src/connector/python:$(pwd)
|
# export PYTHONPATH=$(pwd)/../../src/connector/python:$(pwd)
|
||||||
|
|
|
@ -254,7 +254,7 @@ class WorkerThread:
|
||||||
|
|
||||||
|
|
||||||
class ThreadCoordinator:
|
class ThreadCoordinator:
|
||||||
WORKER_THREAD_TIMEOUT = 120 # Normal: 120
|
WORKER_THREAD_TIMEOUT = 1200 # Normal: 120
|
||||||
|
|
||||||
def __init__(self, pool: ThreadPool, dbManager: DbManager):
|
def __init__(self, pool: ThreadPool, dbManager: DbManager):
|
||||||
self._curStep = -1 # first step is 0
|
self._curStep = -1 # first step is 0
|
||||||
|
@ -674,9 +674,12 @@ class AnyState:
|
||||||
# only "under normal circumstances", as we may override it with the -b option
|
# only "under normal circumstances", as we may override it with the -b option
|
||||||
CAN_DROP_DB = 2
|
CAN_DROP_DB = 2
|
||||||
CAN_CREATE_FIXED_SUPER_TABLE = 3
|
CAN_CREATE_FIXED_SUPER_TABLE = 3
|
||||||
|
CAN_CREATE_STREAM = 3 # super table must exists
|
||||||
CAN_DROP_FIXED_SUPER_TABLE = 4
|
CAN_DROP_FIXED_SUPER_TABLE = 4
|
||||||
CAN_ADD_DATA = 5
|
CAN_ADD_DATA = 5
|
||||||
|
CAN_DROP_STREAM = 5
|
||||||
CAN_READ_DATA = 6
|
CAN_READ_DATA = 6
|
||||||
|
CAN_DELETE_DATA = 6
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._info = self.getInfo()
|
self._info = self.getInfo()
|
||||||
|
@ -727,12 +730,18 @@ class AnyState:
|
||||||
return False
|
return False
|
||||||
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
|
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
|
||||||
|
|
||||||
|
def canCreateStream(self):
|
||||||
|
return self._info[self.CAN_CREATE_STREAM]
|
||||||
|
|
||||||
def canAddData(self):
|
def canAddData(self):
|
||||||
return self._info[self.CAN_ADD_DATA]
|
return self._info[self.CAN_ADD_DATA]
|
||||||
|
|
||||||
def canReadData(self):
|
def canReadData(self):
|
||||||
return self._info[self.CAN_READ_DATA]
|
return self._info[self.CAN_READ_DATA]
|
||||||
|
|
||||||
|
def canDeleteData(self):
|
||||||
|
return self._info[self.CAN_DELETE_DATA]
|
||||||
|
|
||||||
def assertAtMostOneSuccess(self, tasks, cls):
|
def assertAtMostOneSuccess(self, tasks, cls):
|
||||||
sCnt = 0
|
sCnt = 0
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
|
@ -921,7 +930,7 @@ class StateMechine:
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
Logging.error("Failed to initialized state machine, cannot find current state: {}".format(err))
|
Logging.error("Failed to initialized state machine, cannot find current state: {}".format(err))
|
||||||
traceback.print_stack()
|
traceback.print_stack()
|
||||||
raise # re-throw
|
pass # re-throw
|
||||||
|
|
||||||
# TODO: seems no lnoger used, remove?
|
# TODO: seems no lnoger used, remove?
|
||||||
def getCurrentState(self):
|
def getCurrentState(self):
|
||||||
|
@ -974,14 +983,21 @@ class StateMechine:
|
||||||
# did not do this when openning connection, and this is NOT the worker
|
# did not do this when openning connection, and this is NOT the worker
|
||||||
# thread, which does this on their own
|
# thread, which does this on their own
|
||||||
dbc.use(dbName)
|
dbc.use(dbName)
|
||||||
|
|
||||||
if not dbc.hasTables(): # no tables
|
if not dbc.hasTables(): # no tables
|
||||||
|
|
||||||
Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
|
Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
|
||||||
return StateDbOnly()
|
return StateDbOnly()
|
||||||
|
|
||||||
# For sure we have tables, which means we must have the super table. # TODO: are we sure?
|
# For sure we have tables, which means we must have the super table. # TODO: are we sure?
|
||||||
|
|
||||||
sTable = self._db.getFixedSuperTable()
|
sTable = self._db.getFixedSuperTable()
|
||||||
|
|
||||||
|
|
||||||
if sTable.hasRegTables(dbc): # no regular tables
|
if sTable.hasRegTables(dbc): # no regular tables
|
||||||
|
# print("debug=====*\n"*100)
|
||||||
Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
|
Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
|
||||||
|
|
||||||
return StateSuperTableOnly()
|
return StateSuperTableOnly()
|
||||||
else: # has actual tables
|
else: # has actual tables
|
||||||
Logging.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
|
Logging.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
|
||||||
|
@ -1109,6 +1125,7 @@ class Database:
|
||||||
return "fs_table"
|
return "fs_table"
|
||||||
|
|
||||||
def getFixedSuperTable(self) -> TdSuperTable:
|
def getFixedSuperTable(self) -> TdSuperTable:
|
||||||
|
|
||||||
return TdSuperTable(self.getFixedSuperTableName(), self.getName())
|
return TdSuperTable(self.getFixedSuperTableName(), self.getName())
|
||||||
|
|
||||||
# We aim to create a starting time tick, such that, whenever we run our test here once
|
# We aim to create a starting time tick, such that, whenever we run our test here once
|
||||||
|
@ -1342,6 +1359,11 @@ class Task():
|
||||||
0x2603, # Table does not exist, replaced by 2662 below
|
0x2603, # Table does not exist, replaced by 2662 below
|
||||||
0x260d, # Tags number not matched
|
0x260d, # Tags number not matched
|
||||||
0x2662, # Table does not exist #TODO: what about 2603 above?
|
0x2662, # Table does not exist #TODO: what about 2603 above?
|
||||||
|
0x032C, # Object is creating
|
||||||
|
0x032D, # Object is dropping
|
||||||
|
0x03D3, # Conflict transaction not completed
|
||||||
|
0x0707, # Query not ready , it always occur at replica 3
|
||||||
|
0x707, # Query not ready
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -1638,9 +1660,12 @@ class TaskCreateDb(StateTransitionTask):
|
||||||
# numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N
|
# numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N
|
||||||
numReplica = Config.getConfig().num_replicas # fixed, always
|
numReplica = Config.getConfig().num_replicas # fixed, always
|
||||||
repStr = "replica {}".format(numReplica)
|
repStr = "replica {}".format(numReplica)
|
||||||
updatePostfix = "update 1" if Config.getConfig().verify_data else "" # allow update only when "verify data" is active
|
updatePostfix = "" if Config.getConfig().verify_data else "" # allow update only when "verify data" is active , 3.0 version default is update 1
|
||||||
|
vg_nums = random.randint(1,8)
|
||||||
|
cache_model = Dice.choice(['none' , 'last_row' , 'last_value' , 'both'])
|
||||||
|
buffer = random.randint(3,128)
|
||||||
dbName = self._db.getName()
|
dbName = self._db.getName()
|
||||||
self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
|
self.execWtSql(wt, "create database {} {} {} vgroups {} cachemodel '{}' buffer {} ".format(dbName, repStr, updatePostfix, vg_nums, cache_model,buffer ) )
|
||||||
if dbName == "db_0" and Config.getConfig().use_shadow_db:
|
if dbName == "db_0" and Config.getConfig().use_shadow_db:
|
||||||
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
|
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
|
||||||
|
|
||||||
|
@ -1657,6 +1682,69 @@ class TaskDropDb(StateTransitionTask):
|
||||||
self.execWtSql(wt, "drop database {}".format(self._db.getName()))
|
self.execWtSql(wt, "drop database {}".format(self._db.getName()))
|
||||||
Logging.debug("[OPS] database dropped at {}".format(time.time()))
|
Logging.debug("[OPS] database dropped at {}".format(time.time()))
|
||||||
|
|
||||||
|
class TaskCreateStream(StateTransitionTask):
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def getEndState(cls):
|
||||||
|
return StateHasData()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def canBeginFrom(cls, state: AnyState):
|
||||||
|
return state.canCreateStream()
|
||||||
|
|
||||||
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
|
dbname = self._db.getName()
|
||||||
|
|
||||||
|
sub_stream_name = dbname+ '_sub_stream'
|
||||||
|
sub_stream_tb_name = 'avg_sub'
|
||||||
|
super_stream_name = dbname+ '_super_stream'
|
||||||
|
super_stream_tb_name = 'avg_super'
|
||||||
|
if not self._db.exists(wt.getDbConn()):
|
||||||
|
Logging.debug("Skipping task, no DB yet")
|
||||||
|
return
|
||||||
|
|
||||||
|
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
||||||
|
# wt.execSql("use db") # should always be in place
|
||||||
|
|
||||||
|
# create stream
|
||||||
|
'''
|
||||||
|
CREATE STREAM avg_vol_s INTO avg_vol AS SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
|
||||||
|
'''
|
||||||
|
stbname =sTable.getName()
|
||||||
|
sub_tables = sTable.getRegTables(wt.getDbConn())
|
||||||
|
if sub_tables:
|
||||||
|
|
||||||
|
if sub_tables: # if not empty
|
||||||
|
sub_tbname = sub_tables[0]
|
||||||
|
|
||||||
|
# create stream with query above sub_table
|
||||||
|
stream_sql = 'create stream {} into {}.{} as select count(*), avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(sub_stream_name,dbname,sub_stream_tb_name ,dbname,sub_tbname)
|
||||||
|
try:
|
||||||
|
self.execWtSql(wt, stream_sql)
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
errno = Helper.convertErrno(err.errno)
|
||||||
|
if errno in [0x03f0]: # stream already exists
|
||||||
|
# stream need drop before drop table
|
||||||
|
pass
|
||||||
|
|
||||||
|
sTable.setStreamName(sub_stream_name)
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
else:
|
||||||
|
stream_sql = 'create stream {} into {}.{} as select count(*), avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(super_stream_name,dbname,super_stream_tb_name,dbname,stbname)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.execWtSql(wt, stream_sql)
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
errno = Helper.convertErrno(err.errno)
|
||||||
|
if errno in [0x03f0]: # stream already exists
|
||||||
|
# stream need drop before drop table
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class TaskCreateSuperTable(StateTransitionTask):
|
class TaskCreateSuperTable(StateTransitionTask):
|
||||||
@classmethod
|
@classmethod
|
||||||
def getEndState(cls):
|
def getEndState(cls):
|
||||||
|
@ -1688,15 +1776,40 @@ class TdSuperTable:
|
||||||
def __init__(self, stName, dbName):
|
def __init__(self, stName, dbName):
|
||||||
self._stName = stName
|
self._stName = stName
|
||||||
self._dbName = dbName
|
self._dbName = dbName
|
||||||
|
self._streamName = []
|
||||||
|
|
||||||
def getName(self):
|
def getName(self):
|
||||||
return self._stName
|
return self._stName
|
||||||
|
|
||||||
|
def setStreamName(self,name):
|
||||||
|
self._streamName.append(name)
|
||||||
|
|
||||||
|
def getStreamName(self):
|
||||||
|
return self._streamName
|
||||||
|
|
||||||
def drop(self, dbc, skipCheck = False):
|
def drop(self, dbc, skipCheck = False):
|
||||||
dbName = self._dbName
|
dbName = self._dbName
|
||||||
if self.exists(dbc) : # if myself exists
|
if self.exists(dbc) : # if myself exists
|
||||||
fullTableName = dbName + '.' + self._stName
|
fullTableName = dbName + '.' + self._stName
|
||||||
|
try:
|
||||||
dbc.execute("DROP TABLE {}".format(fullTableName))
|
dbc.execute("DROP TABLE {}".format(fullTableName))
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
errno = Helper.convertErrno(err.errno)
|
||||||
|
if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist
|
||||||
|
pass
|
||||||
|
# # stream need drop before drop table
|
||||||
|
# for stream in self.getStreamName():
|
||||||
|
# drop_stream_sql = 'drop stream {}'.format(stream)
|
||||||
|
# try:
|
||||||
|
# dbc.execute(drop_stream_sql)
|
||||||
|
# except taos.error.ProgrammingError as err:
|
||||||
|
# # correcting for strange error number scheme
|
||||||
|
# errno3 = Helper.convertErrno(err.errno)
|
||||||
|
# if errno3 in [1011,0x3F3,0x03f3,0x2662,0x03f1]: # stream not exists
|
||||||
|
# pass
|
||||||
|
# dbc.execute("DROP TABLE {}".format(fullTableName))
|
||||||
|
# pass
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if not skipCheck:
|
if not skipCheck:
|
||||||
raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))
|
raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))
|
||||||
|
@ -1712,9 +1825,16 @@ class TdSuperTable:
|
||||||
dbName = self._dbName
|
dbName = self._dbName
|
||||||
dbc.execute("USE " + dbName)
|
dbc.execute("USE " + dbName)
|
||||||
fullTableName = dbName + '.' + self._stName
|
fullTableName = dbName + '.' + self._stName
|
||||||
|
|
||||||
if dbc.existsSuperTable(self._stName):
|
if dbc.existsSuperTable(self._stName):
|
||||||
if dropIfExists:
|
if dropIfExists:
|
||||||
|
|
||||||
dbc.execute("DROP TABLE {}".format(fullTableName))
|
dbc.execute("DROP TABLE {}".format(fullTableName))
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# dbc.execute("DROP TABLE {}".format(fullTableName))
|
||||||
else: # error
|
else: # error
|
||||||
raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
|
raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
|
||||||
|
|
||||||
|
@ -1733,7 +1853,7 @@ class TdSuperTable:
|
||||||
def getRegTables(self, dbc: DbConn):
|
def getRegTables(self, dbc: DbConn):
|
||||||
dbName = self._dbName
|
dbName = self._dbName
|
||||||
try:
|
try:
|
||||||
dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later
|
dbc.query("select distinct TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
errno2 = Helper.convertErrno(err.errno)
|
errno2 = Helper.convertErrno(err.errno)
|
||||||
Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
|
Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
|
||||||
|
@ -1743,7 +1863,44 @@ class TdSuperTable:
|
||||||
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
|
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
|
||||||
|
|
||||||
def hasRegTables(self, dbc: DbConn):
|
def hasRegTables(self, dbc: DbConn):
|
||||||
|
# print(self._stName)
|
||||||
|
# dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName))
|
||||||
|
# print(dbc.getQueryResult())
|
||||||
|
if self.hasStreamTables(dbc) or self.hasStreams(dbc):
|
||||||
|
if self.dropStreams(dbc):
|
||||||
|
self.dropStreamTables(dbc)
|
||||||
|
if dbc.existsSuperTable(self._stName):
|
||||||
|
|
||||||
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
|
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def hasStreamTables(self,dbc: DbConn):
|
||||||
|
|
||||||
|
return dbc.query("show {}.stables like 'avg%'".format(self._dbName)) > 0
|
||||||
|
|
||||||
|
def hasStreams(self,dbc: DbConn):
|
||||||
|
return dbc.query("show streams") > 0
|
||||||
|
|
||||||
|
def dropStreams(self,dbc:DbConn):
|
||||||
|
dbc.query("show streams ")
|
||||||
|
Streams = dbc.getQueryResult()
|
||||||
|
for Stream in Streams:
|
||||||
|
if Stream[0].startswith(self._dbName):
|
||||||
|
dbc.execute('drop stream {}'.format(Stream[0]))
|
||||||
|
|
||||||
|
return not dbc.query("show streams ") > 0
|
||||||
|
|
||||||
|
def dropStreamTables(self, dbc: DbConn):
|
||||||
|
dbc.query("show {}.stables like 'avg%'".format(self._dbName))
|
||||||
|
|
||||||
|
StreamTables = dbc.getQueryResult()
|
||||||
|
|
||||||
|
for StreamTable in StreamTables:
|
||||||
|
if self.dropStreams:
|
||||||
|
dbc.execute('drop table {}.{}'.format(self._dbName,StreamTable[0]))
|
||||||
|
|
||||||
|
return not dbc.query("show {}.stables like 'avg%'".format(self._dbName))
|
||||||
|
|
||||||
def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str):
|
def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str):
|
||||||
'''
|
'''
|
||||||
|
@ -1838,10 +1995,46 @@ class TdSuperTable:
|
||||||
# Run the query against the regular table first
|
# Run the query against the regular table first
|
||||||
doAggr = (Dice.throw(2) == 0) # 1 in 2 chance
|
doAggr = (Dice.throw(2) == 0) # 1 in 2 chance
|
||||||
if not doAggr: # don't do aggregate query, just simple one
|
if not doAggr: # don't do aggregate query, just simple one
|
||||||
|
commonExpr = Dice.choice([
|
||||||
|
'*',
|
||||||
|
# 'abs(speed)',
|
||||||
|
# 'acos(speed)',
|
||||||
|
# 'asin(speed)',
|
||||||
|
# 'atan(speed)',
|
||||||
|
# 'ceil(speed)',
|
||||||
|
# 'cos(speed)',
|
||||||
|
# 'cos(speed)',
|
||||||
|
# 'floor(speed)',
|
||||||
|
# 'log(speed,2)',
|
||||||
|
# 'pow(speed,2)',
|
||||||
|
# 'round(speed)',
|
||||||
|
# 'sin(speed)',
|
||||||
|
# 'sqrt(speed)',
|
||||||
|
# 'char_length(color)',
|
||||||
|
# 'concat(color,color)',
|
||||||
|
# 'concat_ws(" ", color,color," ")',
|
||||||
|
# 'length(color)',
|
||||||
|
# 'lower(color)',
|
||||||
|
# 'ltrim(color)',
|
||||||
|
# 'substr(color , 2)',
|
||||||
|
# 'upper(color)',
|
||||||
|
# 'cast(speed as double)',
|
||||||
|
# 'cast(ts as bigint)',
|
||||||
|
# # 'TO_ISO8601(color)',
|
||||||
|
# # 'TO_UNIXTIMESTAMP(ts)',
|
||||||
|
# 'now()',
|
||||||
|
# 'timediff(ts,now)',
|
||||||
|
# 'timezone()',
|
||||||
|
# 'TIMETRUNCATE(ts,1s)',
|
||||||
|
# 'TIMEZONE()',
|
||||||
|
# 'TODAY()',
|
||||||
|
# 'distinct(color)'
|
||||||
|
]
|
||||||
|
)
|
||||||
ret.append(SqlQuery( # reg table
|
ret.append(SqlQuery( # reg table
|
||||||
"select {} from {}.{}".format('*', self._dbName, rTbName)))
|
"select {} from {}.{}".format(commonExpr, self._dbName, rTbName)))
|
||||||
ret.append(SqlQuery( # super table
|
ret.append(SqlQuery( # super table
|
||||||
"select {} from {}.{}".format('*', self._dbName, self.getName())))
|
"select {} from {}.{}".format(commonExpr, self._dbName, self.getName())))
|
||||||
else: # Aggregate query
|
else: # Aggregate query
|
||||||
aggExpr = Dice.choice([
|
aggExpr = Dice.choice([
|
||||||
'count(*)',
|
'count(*)',
|
||||||
|
@ -1857,17 +2050,38 @@ class TdSuperTable:
|
||||||
'top(speed, 50)', # TODO: not supported?
|
'top(speed, 50)', # TODO: not supported?
|
||||||
'bottom(speed, 50)', # TODO: not supported?
|
'bottom(speed, 50)', # TODO: not supported?
|
||||||
'apercentile(speed, 10)', # TODO: TD-1316
|
'apercentile(speed, 10)', # TODO: TD-1316
|
||||||
# 'last_row(speed)', # TODO: commented out per TD-3231, we should re-create
|
'last_row(*)', # TODO: commented out per TD-3231, we should re-create
|
||||||
# Transformation Functions
|
# Transformation Functions
|
||||||
# 'diff(speed)', # TODO: no supported?!
|
# 'diff(speed)', # TODO: no supported?!
|
||||||
'spread(speed)'
|
'spread(speed)',
|
||||||
|
# 'elapsed(ts)',
|
||||||
|
# 'mode(speed)',
|
||||||
|
# 'bottom(speed,1)',
|
||||||
|
# 'top(speed,1)',
|
||||||
|
# 'tail(speed,1)',
|
||||||
|
# 'unique(color)',
|
||||||
|
# 'csum(speed)',
|
||||||
|
# 'DERIVATIVE(speed,1s,1)',
|
||||||
|
# 'diff(speed,1)',
|
||||||
|
# 'irate(speed)',
|
||||||
|
# 'mavg(speed,3)',
|
||||||
|
# 'sample(speed,5)',
|
||||||
|
# 'STATECOUNT(speed,"LT",1)',
|
||||||
|
# 'STATEDURATION(speed,"LT",1)',
|
||||||
|
# 'twa(speed)'
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
]) # TODO: add more from 'top'
|
]) # TODO: add more from 'top'
|
||||||
|
|
||||||
|
|
||||||
# if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
|
# if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
|
||||||
sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
|
sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
|
||||||
if Dice.throw(3) == 0: # 1 in X chance
|
if Dice.throw(3) == 0: # 1 in X chance
|
||||||
sql = sql + ' GROUP BY color'
|
partion_expr = Dice.choice(['color','tbname'])
|
||||||
|
sql = sql + ' partition BY ' + partion_expr + ' order by ' + partion_expr
|
||||||
Progress.emit(Progress.QUERY_GROUP_BY)
|
Progress.emit(Progress.QUERY_GROUP_BY)
|
||||||
# Logging.info("Executing GROUP-BY query: " + sql)
|
# Logging.info("Executing GROUP-BY query: " + sql)
|
||||||
ret.append(SqlQuery(sql))
|
ret.append(SqlQuery(sql))
|
||||||
|
@ -1964,16 +2178,17 @@ class TaskDropSuperTable(StateTransitionTask):
|
||||||
isSuccess = True
|
isSuccess = True
|
||||||
for i in tblSeq:
|
for i in tblSeq:
|
||||||
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
|
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
|
||||||
|
streams_prix = self._db.getName()
|
||||||
try:
|
try:
|
||||||
self.execWtSql(wt, "drop table {}.{}".
|
self.execWtSql(wt, "drop table {}.{}".
|
||||||
format(self._db.getName(), regTableName)) # nRows always 0, like MySQL
|
format(self._db.getName(), regTableName)) # nRows always 0, like MySQL
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
# correcting for strange error number scheme
|
pass
|
||||||
errno2 = Helper.convertErrno(err.errno)
|
|
||||||
if (errno2 in [0x362]): # mnode invalid table name
|
|
||||||
isSuccess = False
|
|
||||||
Logging.debug("[DB] Acceptable error when dropping a table")
|
|
||||||
continue # try to delete next regular table
|
|
||||||
|
|
||||||
if (not tickOutput):
|
if (not tickOutput):
|
||||||
tickOutput = True # Print only one time
|
tickOutput = True # Print only one time
|
||||||
|
@ -1984,7 +2199,17 @@ class TaskDropSuperTable(StateTransitionTask):
|
||||||
|
|
||||||
# Drop the super table itself
|
# Drop the super table itself
|
||||||
tblName = self._db.getFixedSuperTableName()
|
tblName = self._db.getFixedSuperTableName()
|
||||||
|
try:
|
||||||
self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
|
self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
# correcting for strange error number scheme
|
||||||
|
errno2 = Helper.convertErrno(err.errno)
|
||||||
|
if (errno2 in [0x362]): # mnode invalid table name
|
||||||
|
isSuccess = False
|
||||||
|
Logging.debug("[DB] Acceptable error when dropping a table")
|
||||||
|
elif errno2 in [1011,0x3F3,0x03f3]: # table doesn't exist
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TaskAlterTags(StateTransitionTask):
|
class TaskAlterTags(StateTransitionTask):
|
||||||
|
@ -2234,6 +2459,212 @@ class TaskAddData(StateTransitionTask):
|
||||||
|
|
||||||
self.activeTable.discard(i) # not raising an error, unlike remove
|
self.activeTable.discard(i) # not raising an error, unlike remove
|
||||||
|
|
||||||
|
class TaskDeleteData(StateTransitionTask):
|
||||||
|
# Track which table is being actively worked on
|
||||||
|
activeTable: Set[int] = set()
|
||||||
|
|
||||||
|
# We use these two files to record operations to DB, useful for power-off tests
|
||||||
|
fAddLogReady = None # type: Optional[io.TextIOWrapper]
|
||||||
|
fAddLogDone = None # type: Optional[io.TextIOWrapper]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def prepToRecordOps(cls):
|
||||||
|
if Config.getConfig().record_ops:
|
||||||
|
if (cls.fAddLogReady is None):
|
||||||
|
Logging.info(
|
||||||
|
"Recording in a file operations to be performed...")
|
||||||
|
cls.fAddLogReady = open("add_log_ready.txt", "w")
|
||||||
|
if (cls.fAddLogDone is None):
|
||||||
|
Logging.info("Recording in a file operations completed...")
|
||||||
|
cls.fAddLogDone = open("add_log_done.txt", "w")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def getEndState(cls):
|
||||||
|
return StateHasData()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def canBeginFrom(cls, state: AnyState):
|
||||||
|
return state.canDeleteData()
|
||||||
|
|
||||||
|
def _lockTableIfNeeded(self, fullTableName, extraMsg = ''):
|
||||||
|
if Config.getConfig().verify_data:
|
||||||
|
# Logging.info("Locking table: {}".format(fullTableName))
|
||||||
|
self.lockTable(fullTableName)
|
||||||
|
# Logging.info("Table locked {}: {}".format(extraMsg, fullTableName))
|
||||||
|
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
|
||||||
|
else:
|
||||||
|
# Logging.info("Skipping locking table")
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _unlockTableIfNeeded(self, fullTableName):
|
||||||
|
if Config.getConfig().verify_data:
|
||||||
|
# Logging.info("Unlocking table: {}".format(fullTableName))
|
||||||
|
self.unlockTable(fullTableName)
|
||||||
|
# Logging.info("Table unlocked: {}".format(fullTableName))
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
# Logging.info("Skipping unlocking table")
|
||||||
|
|
||||||
|
def _deleteData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
|
||||||
|
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
|
||||||
|
del_Records = int(numRecords/5)
|
||||||
|
if Dice.throw(2) == 0:
|
||||||
|
for j in range(del_Records): # number of records per table
|
||||||
|
intToWrite = db.getNextInt()
|
||||||
|
nextTick = db.getNextTick()
|
||||||
|
# nextColor = db.getNextColor()
|
||||||
|
if Config.getConfig().record_ops:
|
||||||
|
self.prepToRecordOps()
|
||||||
|
if self.fAddLogReady is None:
|
||||||
|
raise CrashGenError("Unexpected empty fAddLogReady")
|
||||||
|
self.fAddLogReady.write("Ready to delete {} to {}\n".format(intToWrite, regTableName))
|
||||||
|
self.fAddLogReady.flush()
|
||||||
|
os.fsync(self.fAddLogReady.fileno())
|
||||||
|
|
||||||
|
# TODO: too ugly trying to lock the table reliably, refactor...
|
||||||
|
fullTableName = db.getName() + '.' + regTableName
|
||||||
|
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
|
||||||
|
|
||||||
|
try:
|
||||||
|
sql = "delete from {} where ts = '{}' ;".format( # removed: tags ('{}', {})
|
||||||
|
fullTableName,
|
||||||
|
# ds.getFixedSuperTableName(),
|
||||||
|
# ds.getNextBinary(), ds.getNextFloat(),
|
||||||
|
nextTick)
|
||||||
|
|
||||||
|
# print(sql)
|
||||||
|
# Logging.info("Adding data: {}".format(sql))
|
||||||
|
dbc.execute(sql)
|
||||||
|
# Logging.info("Data added: {}".format(sql))
|
||||||
|
intWrote = intToWrite
|
||||||
|
|
||||||
|
# Quick hack, attach an update statement here. TODO: create an "update" task
|
||||||
|
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
|
||||||
|
intToUpdate = db.getNextInt() # Updated, but should not succeed
|
||||||
|
# nextColor = db.getNextColor()
|
||||||
|
sql = "delete from {} where ts = '{}' ;".format( # "INSERt" means "update" here
|
||||||
|
fullTableName,
|
||||||
|
nextTick)
|
||||||
|
# sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format(
|
||||||
|
# fullTableName, db.getNextInt(), db.getNextColor(), nextTick)
|
||||||
|
dbc.execute(sql)
|
||||||
|
intWrote = intToUpdate # We updated, seems TDengine non-cluster accepts this.
|
||||||
|
|
||||||
|
except: # Any exception at all
|
||||||
|
self._unlockTableIfNeeded(fullTableName)
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Now read it back and verify, we might encounter an error if table is dropped
|
||||||
|
if Config.getConfig().verify_data: # only if command line asks for it
|
||||||
|
try:
|
||||||
|
readBack = dbc.queryScalar("SELECT * from {}.{} WHERE ts='{}'".
|
||||||
|
format(db.getName(), regTableName, nextTick))
|
||||||
|
if readBack == None :
|
||||||
|
pass
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
errno = Helper.convertErrno(err.errno)
|
||||||
|
if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result
|
||||||
|
print("D1",end="") # D1 means delete data success and only 1 record
|
||||||
|
|
||||||
|
elif errno in [0x218, 0x362]: # table doesn't exist
|
||||||
|
# do nothing
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# Re-throw otherwise
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
self._unlockTableIfNeeded(fullTableName) # Quite ugly, refactor lock/unlock
|
||||||
|
# Done with read-back verification, unlock the table now
|
||||||
|
# Successfully wrote the data into the DB, let's record it somehow
|
||||||
|
te.recordDataMark(intWrote)
|
||||||
|
else:
|
||||||
|
|
||||||
|
# delete all datas and verify datas ,expected table is empty
|
||||||
|
if Config.getConfig().record_ops:
|
||||||
|
self.prepToRecordOps()
|
||||||
|
if self.fAddLogReady is None:
|
||||||
|
raise CrashGenError("Unexpected empty fAddLogReady")
|
||||||
|
self.fAddLogReady.write("Ready to delete {} to {}\n".format(intToWrite, regTableName))
|
||||||
|
self.fAddLogReady.flush()
|
||||||
|
os.fsync(self.fAddLogReady.fileno())
|
||||||
|
|
||||||
|
# TODO: too ugly trying to lock the table reliably, refactor...
|
||||||
|
fullTableName = db.getName() + '.' + regTableName
|
||||||
|
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
|
||||||
|
|
||||||
|
try:
|
||||||
|
sql = "delete from {} ;".format( # removed: tags ('{}', {})
|
||||||
|
fullTableName)
|
||||||
|
# Logging.info("Adding data: {}".format(sql))
|
||||||
|
dbc.execute(sql)
|
||||||
|
# Logging.info("Data added: {}".format(sql))
|
||||||
|
|
||||||
|
# Quick hack, attach an update statement here. TODO: create an "update" task
|
||||||
|
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
|
||||||
|
sql = "delete from {} ;".format( # "INSERt" means "update" here
|
||||||
|
fullTableName)
|
||||||
|
dbc.execute(sql)
|
||||||
|
|
||||||
|
except: # Any exception at all
|
||||||
|
self._unlockTableIfNeeded(fullTableName)
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Now read it back and verify, we might encounter an error if table is dropped
|
||||||
|
if Config.getConfig().verify_data: # only if command line asks for it
|
||||||
|
try:
|
||||||
|
readBack = dbc.queryScalar("SELECT * from {}.{} ".
|
||||||
|
format(db.getName(), regTableName))
|
||||||
|
if readBack == None :
|
||||||
|
pass
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
errno = Helper.convertErrno(err.errno)
|
||||||
|
if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result
|
||||||
|
print("Da",end="") # Da means delete data success and for all datas
|
||||||
|
|
||||||
|
elif errno in [0x218, 0x362]: # table doesn't exist
|
||||||
|
# do nothing
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# Re-throw otherwise
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
self._unlockTableIfNeeded(fullTableName) # Quite ugly, refactor lock/unlock
|
||||||
|
# Done with read-back verification, unlock the table now
|
||||||
|
|
||||||
|
if Config.getConfig().record_ops:
|
||||||
|
if self.fAddLogDone is None:
|
||||||
|
raise CrashGenError("Unexpected empty fAddLogDone")
|
||||||
|
self.fAddLogDone.write("Wrote {} to {}\n".format(intWrote, regTableName))
|
||||||
|
self.fAddLogDone.flush()
|
||||||
|
os.fsync(self.fAddLogDone.fileno())
|
||||||
|
|
||||||
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
|
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
|
||||||
|
db = self._db
|
||||||
|
dbc = wt.getDbConn()
|
||||||
|
numTables = self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES
|
||||||
|
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
|
||||||
|
tblSeq = list(range(numTables ))
|
||||||
|
random.shuffle(tblSeq) # now we have random sequence
|
||||||
|
for i in tblSeq:
|
||||||
|
if (i in self.activeTable): # wow already active
|
||||||
|
# print("x", end="", flush=True) # concurrent insertion
|
||||||
|
Progress.emit(Progress.CONCURRENT_INSERTION)
|
||||||
|
else:
|
||||||
|
self.activeTable.add(i) # marking it active
|
||||||
|
|
||||||
|
dbName = db.getName()
|
||||||
|
sTable = db.getFixedSuperTable()
|
||||||
|
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
|
||||||
|
fullTableName = dbName + '.' + regTableName
|
||||||
|
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
|
||||||
|
sTable.ensureRegTable(self, wt.getDbConn(), regTableName) # Ensure the table exists
|
||||||
|
# self._unlockTable(fullTableName)
|
||||||
|
|
||||||
|
self._deleteData(db, dbc, regTableName, te)
|
||||||
|
|
||||||
|
self.activeTable.discard(i) # not raising an error, unlike remove
|
||||||
|
|
||||||
|
|
||||||
class ThreadStacks: # stack info for all threads
|
class ThreadStacks: # stack info for all threads
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -2259,7 +2690,8 @@ class ThreadStacks: # stack info for all threads
|
||||||
# Now print
|
# Now print
|
||||||
print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(shortTid))
|
print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(shortTid))
|
||||||
lastSqlForThread = DbConn.fetchSqlForThread(shortTid)
|
lastSqlForThread = DbConn.fetchSqlForThread(shortTid)
|
||||||
print("Last SQL statement attempted from thread {} is: {}".format(shortTid, lastSqlForThread))
|
time_cost = DbConn.get_time_cost()
|
||||||
|
print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid, time_cost ,lastSqlForThread))
|
||||||
stackFrame = 0
|
stackFrame = 0
|
||||||
for frame in stack: # was using: reversed(stack)
|
for frame in stack: # was using: reversed(stack)
|
||||||
# print(frame)
|
# print(frame)
|
||||||
|
|
Loading…
Reference in New Issue