This commit is contained in:
wenzhouwww@live.cn 2022-11-15 09:12:42 +08:00
parent 6175929113
commit b47052b7af
1 changed files with 48 additions and 196 deletions

View File

@ -1415,6 +1415,7 @@ class Task():
0x03E1, # failed on tmq_subscribe ,topic not exist 0x03E1, # failed on tmq_subscribe ,topic not exist
0x03ed , # Topic must be dropped first, SQL: drop database db_0 0x03ed , # Topic must be dropped first, SQL: drop database db_0
0x0203 , # Invalid value 0x0203 , # Invalid value
0x03f0 , # Stream already exist , topic already exists
@ -1732,12 +1733,6 @@ class TaskDropDb(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# drop topics before drop db
# if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()):
# self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None)
# self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),self._db.getFixedSuperTableName)
try: try:
self.queryWtSql(wt, "drop database {}".format(self._db.getName())) # drop database maybe failed ,because topic exists self.queryWtSql(wt, "drop database {}".format(self._db.getName())) # drop database maybe failed ,because topic exists
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
@ -1748,10 +1743,6 @@ class TaskDropDb(StateTransitionTask):
Logging.debug("[OPS] database dropped at {}".format(time.time())) Logging.debug("[OPS] database dropped at {}".format(time.time()))
# Streams will generator TD-20237 it will crash taosd , start this task when this issue fixed
class TaskCreateStream(StateTransitionTask): class TaskCreateStream(StateTransitionTask):
@classmethod @classmethod
@ -1775,60 +1766,25 @@ class TaskCreateStream(StateTransitionTask):
sTable = self._db.getFixedSuperTable() # type: TdSuperTable sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place # wt.execSql("use db") # should always be in place
# create stream
# CREATE STREAM avg_vol_s INTO avg_vol AS SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
stbname =sTable.getName() stbname =sTable.getName()
sub_tables = sTable.getRegTables(wt.getDbConn()) sub_tables = sTable.getRegTables(wt.getDbConn())
aggExpr = Dice.choice([ aggExpr = Dice.choice([
'count(*)', 'count(*)', 'avg(speed)', 'sum(speed)', 'stddev(speed)','min(speed)', 'max(speed)', 'first(speed)', 'last(speed)',
'avg(speed)', 'apercentile(speed, 10)', 'last_row(*)', 'twa(speed)'])
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
'sum(speed)',
'stddev(speed)',
# SELECTOR functions
'min(speed)',
'max(speed)',
'first(speed)',
'last(speed)',
'apercentile(speed, 10)', # TODO: TD-1316
'last_row(*)', # TODO: commented out per TD-3231, we should re-create
# Transformation Functions
'twa(speed)'
]) # TODO: add more from 'top' stream_sql = '' # set default value
if sub_tables: if sub_tables:
if sub_tables: # if not empty
sub_tbname = sub_tables[0] sub_tbname = sub_tables[0]
# create stream with query above sub_table # create stream with query above sub_table
stream_sql = 'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(sub_stream_name,dbname,sub_stream_tb_name ,aggExpr,dbname,sub_tbname) stream_sql = 'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.\
try: format(sub_stream_name,dbname,sub_stream_tb_name ,aggExpr,dbname,sub_tbname)
else:
stream_sql = 'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.\
format(super_stream_name,dbname,super_stream_tb_name,aggExpr, dbname,stbname)
self.execWtSql(wt, stream_sql) self.execWtSql(wt, stream_sql)
Logging.debug("[OPS] stream is creating at {}".format(time.time())) Logging.debug("[OPS] stream is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x03f0]: # stream already exists
# stream need drop before drop table
pass
else:
pass
else:
stream_sql = 'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(super_stream_name,dbname,super_stream_tb_name,aggExpr, dbname,stbname)
try:
self.execWtSql(wt, stream_sql)
Logging.debug("[OPS] stream is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x03f0]: # stream already exists
# stream need drop before drop table
pass
class TaskCreateTopic(StateTransitionTask): class TaskCreateTopic(StateTransitionTask):
@ -1853,95 +1809,36 @@ class TaskCreateTopic(StateTransitionTask):
sTable = self._db.getFixedSuperTable() # type: TdSuperTable sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place # wt.execSql("use db") # should always be in place
# create topic
# create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1; # create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1;
stbname =sTable.getName() stbname =sTable.getName()
sub_tables = sTable.getRegTables(wt.getDbConn()) sub_tables = sTable.getRegTables(wt.getDbConn())
scalarExpr = Dice.choice([ '*','speed','color',
'abs(speed)',
'acos(speed)',
'asin(speed)',
'atan(speed)',
'ceil(speed)',
'cos(speed)',
'cos(speed)',
'floor(speed)',
'log(speed,2)',
'pow(speed,2)',
'round(speed)',
'sin(speed)',
'sqrt(speed)',
'char_length(color)',
'concat(color,color)',
'concat_ws(" ", color,color," ")',
'length(color)',
'lower(color)',
'ltrim(color)',
'substr(color , 2)',
'upper(color)',
'cast(speed as double)',
'cast(ts as bigint)',
]) # TODO: add more from 'top'
if Dice.throw(3)==0:
if sub_tables:
scalarExpr = Dice.choice([ '*','speed','color','abs(speed)','acos(speed)','asin(speed)','atan(speed)','ceil(speed)','cos(speed)','cos(speed)',
'floor(speed)','log(speed,2)','pow(speed,2)','round(speed)','sin(speed)','sqrt(speed)','char_length(color)','concat(color,color)',
'concat_ws(" ", color,color," ")','length(color)', 'lower(color)', 'ltrim(color)','substr(color , 2)','upper(color)','cast(speed as double)',
'cast(ts as bigint)'])
topic_sql = '' # set default value
if Dice.throw(3)==0: # create topic : source data from sub query
if sub_tables: # if not empty if sub_tables: # if not empty
sub_tbname = sub_tables[0] sub_tbname = sub_tables[0]
# create stream with query above sub_table # create topic : source data from sub query of sub stable
topic_sql = 'create topic {} as select {} FROM {}.{} ; '.format(sub_topic_name,scalarExpr,dbname,sub_tbname) topic_sql = 'create topic {} as select {} FROM {}.{} ; '.format(sub_topic_name,scalarExpr,dbname,sub_tbname)
try:
self.execWtSql(wt, "use {}".format(dbname))
self.execWtSql(wt, topic_sql)
Logging.debug("[OPS] topic is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x03f0 ]: # topic already exists
# topic need drop before drop table
pass
else: else: # create topic : source data from sub query of stable
pass
else:
topic_sql = 'create topic {} as select {} FROM {}.{} '.format(super_topic_name,scalarExpr, dbname,stbname) topic_sql = 'create topic {} as select {} FROM {}.{} '.format(super_topic_name,scalarExpr, dbname,stbname)
try: elif Dice.throw(3)==1: # create topic : source data from super table
self.execWtSql(wt, "use {}".format(dbname))
self.execWtSql(wt, topic_sql)
Logging.debug("[OPS] subquery topic is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x03f0]: # topic already exists
# topic need drop before drop table
pass
elif Dice.throw(3)==1:
topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic,dbname,stbname) topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic,dbname,stbname)
try:
self.execWtSql(wt, "use {}".format(dbname)) elif Dice.throw(3)==2: # create topic : source data from whole database
self.queryWtSql(wt, topic_sql)
Logging.debug("[OPS] stable topic is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x03f0]: # topic already exists
# topic need drop before drop table
pass
elif Dice.throw(3)==2:
topic_sql = 'create topic {} AS DATABASE {} '.format(db_topic,dbname) topic_sql = 'create topic {} AS DATABASE {} '.format(db_topic,dbname)
try: else:
pass
# exec create topics
self.execWtSql(wt, "use {}".format(dbname)) self.execWtSql(wt, "use {}".format(dbname))
self.execWtSql(wt, topic_sql) self.execWtSql(wt, topic_sql)
Logging.debug("[OPS] db topic is creating at {}".format(time.time())) Logging.debug("[OPS] db topic is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x03f0]: # topic already exists
# topic need drop before drop table
pass
else:
pass
class TaskDropTopics(StateTransitionTask): class TaskDropTopics(StateTransitionTask):
@ -1991,7 +1888,6 @@ class TaskDropStreams(StateTransitionTask):
# tblName = sTable.getName() # tblName = sTable.getName()
if sTable.hasStreams(wt.getDbConn()): if sTable.hasStreams(wt.getDbConn()):
sTable.dropStreams(wt.getDbConn()) # drop stream of database sTable.dropStreams(wt.getDbConn()) # drop stream of database
# sTable.dropStreamTables(wt.getDbConn()) # drop streamtables of stable
class TaskDropStreamTables(StateTransitionTask): class TaskDropStreamTables(StateTransitionTask):
@ -2012,10 +1908,9 @@ class TaskDropStreamTables(StateTransitionTask):
return return
sTable = self._db.getFixedSuperTable() # type: TdSuperTable sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place wt.execSql("use db") # should always be in place
# tblName = sTable.getName() # tblName = sTable.getName()
if sTable.hasStreamTables(wt.getDbConn()): if sTable.hasStreamTables(wt.getDbConn()):
# sTable.dropStreams(wt.getDbConn())
sTable.dropStreamTables(wt.getDbConn()) # drop stream tables sTable.dropStreamTables(wt.getDbConn()) # drop stream tables
class TaskCreateConsumers(StateTransitionTask): class TaskCreateConsumers(StateTransitionTask):
@ -2031,13 +1926,9 @@ class TaskCreateConsumers(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
if Config.getConfig().connector_type == 'native': if Config.getConfig().connector_type == 'native':
dbname = self._db.getName()
sTable = self._db.getFixedSuperTable() # type: TdSuperTable sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place # wt.execSql("use db") # should always be in place
# create Consumers
# if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task
if sTable.hasTopics(wt.getDbConn()): if sTable.hasTopics(wt.getDbConn()):
sTable.createConsumer(wt.getDbConn(),random.randint(1,10)) sTable.createConsumer(wt.getDbConn(),random.randint(1,10))
pass pass
@ -2088,30 +1979,13 @@ class TdSuperTable:
dbName = self._dbName dbName = self._dbName
if self.exists(dbc) : # if myself exists if self.exists(dbc) : # if myself exists
fullTableName = dbName + '.' + self._stName fullTableName = dbName + '.' + self._stName
# if self.hasStreams(dbc):
# self.dropStreams(dbc)
# self.dropStreamTables(dbc)
# if self.hasTopics(dbc):
# self.dropTopics(dbName,None)
# self.dropTopics(dbName,self._stName)
try: try:
dbc.execute("DROP TABLE {}".format(fullTableName)) dbc.execute("DROP TABLE {}".format(fullTableName))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist # Stream must be dropped first, SQL: DROP TABLE db_0.fs_table if errno in [1011,0x3F3,0x03f3,0x2662]: # table doesn't exist # Stream must be dropped first, SQL: DROP TABLE db_0.fs_table
pass pass
# # stream need drop before drop table
# for stream in self.getStreamName():
# drop_stream_sql = 'drop stream {}'.format(stream)
# try:
# dbc.execute(drop_stream_sql)
# except taos.error.ProgrammingError as err:
# # correcting for strange error number scheme
# errno3 = Helper.convertErrno(err.errno)
# if errno3 in [1011,0x3F3,0x03f3,0x2662,0x03f1]: # stream not exists
# pass
# dbc.execute("DROP TABLE {}".format(fullTableName))
# pass
else: else:
if not skipCheck: if not skipCheck:
@ -2131,15 +2005,6 @@ class TdSuperTable:
if dbc.existsSuperTable(self._stName): if dbc.existsSuperTable(self._stName):
if dropIfExists: if dropIfExists:
# if self.hasStreams(dbc):
# self.dropStreams(dbc)
# self.dropStreamTables(dbc)
# # drop topics before drop stables
# if self.hasTopics(dbc):
# self.dropTopics(dbc,self._dbName,None)
# self.dropTopics(dbc,self._dbName,self._stName )
try: try:
dbc.execute("DROP TABLE {}".format(fullTableName)) dbc.execute("DROP TABLE {}".format(fullTableName))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
@ -2164,7 +2029,7 @@ class TdSuperTable:
sql += " TAGS (dummy int) " sql += " TAGS (dummy int) "
dbc.execute(sql) dbc.execute(sql)
def createConsumer(self, dbc: DbConn , Consumer_nums): def createConsumer(self, dbc,Consumer_nums):
def generateConsumer(current_topic_list): def generateConsumer(current_topic_list):
conf = TaosTmqConf() conf = TaosTmqConf()
@ -2183,7 +2048,14 @@ class TdSuperTable:
consumer.subscribe(topic_list) consumer.subscribe(topic_list)
except TmqError as e : except TmqError as e :
pass pass
time.sleep(5) # consumer work only 5 sec ,and then it will exit
# consumer work only 30 sec
time_start = time.time()
while 1:
res = consumer.poll(1000)
if time.time() - time_start >5 :
break
# time.sleep(10)
try: try:
consumer.unsubscribe() consumer.unsubscribe()
except TmqError as e : except TmqError as e :
@ -2579,29 +2451,9 @@ class TaskDropSuperTable(StateTransitionTask):
# Drop the super table itself # Drop the super table itself
tblName = self._db.getFixedSuperTableName() tblName = self._db.getFixedSuperTableName()
# # drop streams before drop stables
# if self._db.getFixedSuperTable().hasStreams(wt.getDbConn()):
# self._db.getFixedSuperTable().dropStreams(wt.getDbConn())
# self._db.getFixedSuperTable().dropStreamTables(wt.getDbConn())
# # drop topics before drop stables
# if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()):
# self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None)
# self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),tblName)
try:
self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName)) self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
except taos.error.ProgrammingError as err:
# correcting for strange error number scheme
errno2 = Helper.convertErrno(err.errno)
if (errno2 in [0x362]): # mnode invalid table name
isSuccess = False
Logging.debug("[DB] Acceptable error when dropping a table")
elif errno2 in [1011,0x3F3,0x03f3]: # table doesn't exist
pass
class TaskAlterTags(StateTransitionTask): class TaskAlterTags(StateTransitionTask):