update
This commit is contained in:
parent
ca00459abb
commit
59f50ae8c5
|
@ -37,6 +37,7 @@ import requests
|
||||||
# from guppy import hpy
|
# from guppy import hpy
|
||||||
import gc
|
import gc
|
||||||
import taos
|
import taos
|
||||||
|
from taos.tmq import *
|
||||||
|
|
||||||
|
|
||||||
from .shared.types import TdColumns, TdTags
|
from .shared.types import TdColumns, TdTags
|
||||||
|
@ -675,9 +676,10 @@ class AnyState:
|
||||||
CAN_DROP_DB = 2
|
CAN_DROP_DB = 2
|
||||||
CAN_CREATE_FIXED_SUPER_TABLE = 3
|
CAN_CREATE_FIXED_SUPER_TABLE = 3
|
||||||
CAN_CREATE_STREAM = 3 # super table must exists
|
CAN_CREATE_STREAM = 3 # super table must exists
|
||||||
|
CAN_CREATE_TOPIC = 3 # super table must exists
|
||||||
|
CAN_CREATE_CONSUMERS = 3
|
||||||
CAN_DROP_FIXED_SUPER_TABLE = 4
|
CAN_DROP_FIXED_SUPER_TABLE = 4
|
||||||
CAN_ADD_DATA = 5
|
CAN_ADD_DATA = 5
|
||||||
CAN_DROP_STREAM = 5
|
|
||||||
CAN_READ_DATA = 6
|
CAN_READ_DATA = 6
|
||||||
CAN_DELETE_DATA = 6
|
CAN_DELETE_DATA = 6
|
||||||
|
|
||||||
|
@ -730,6 +732,12 @@ class AnyState:
|
||||||
return False
|
return False
|
||||||
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
|
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
|
||||||
|
|
||||||
|
def canCreateTopic(self):
|
||||||
|
return self._info[self.CAN_CREATE_TOPIC]
|
||||||
|
|
||||||
|
def canCreateConsumers(self):
|
||||||
|
return self._info[self.CAN_CREATE_CONSUMERS]
|
||||||
|
|
||||||
def canCreateStream(self):
|
def canCreateStream(self):
|
||||||
return self._info[self.CAN_CREATE_STREAM]
|
return self._info[self.CAN_CREATE_STREAM]
|
||||||
|
|
||||||
|
@ -1679,9 +1687,22 @@ class TaskDropDb(StateTransitionTask):
|
||||||
return state.canDropDb()
|
return state.canDropDb()
|
||||||
|
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
|
|
||||||
|
# drop topics before drop db
|
||||||
|
|
||||||
|
if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()):
|
||||||
|
|
||||||
|
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None)
|
||||||
|
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),self._db.getFixedSuperTableName)
|
||||||
|
|
||||||
self.execWtSql(wt, "drop database {}".format(self._db.getName()))
|
self.execWtSql(wt, "drop database {}".format(self._db.getName()))
|
||||||
|
|
||||||
Logging.debug("[OPS] database dropped at {}".format(time.time()))
|
Logging.debug("[OPS] database dropped at {}".format(time.time()))
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
# Streams will generator TD-20237 (it will crash taosd , start this task when this issue fixed )
|
||||||
|
|
||||||
class TaskCreateStream(StateTransitionTask):
|
class TaskCreateStream(StateTransitionTask):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -1707,9 +1728,9 @@ class TaskCreateStream(StateTransitionTask):
|
||||||
# wt.execSql("use db") # should always be in place
|
# wt.execSql("use db") # should always be in place
|
||||||
|
|
||||||
# create stream
|
# create stream
|
||||||
'''
|
|
||||||
CREATE STREAM avg_vol_s INTO avg_vol AS SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
|
# CREATE STREAM avg_vol_s INTO avg_vol AS SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
|
||||||
'''
|
|
||||||
stbname =sTable.getName()
|
stbname =sTable.getName()
|
||||||
sub_tables = sTable.getRegTables(wt.getDbConn())
|
sub_tables = sTable.getRegTables(wt.getDbConn())
|
||||||
aggExpr = Dice.choice([
|
aggExpr = Dice.choice([
|
||||||
|
@ -1735,7 +1756,7 @@ class TaskCreateStream(StateTransitionTask):
|
||||||
if sub_tables: # if not empty
|
if sub_tables: # if not empty
|
||||||
sub_tbname = sub_tables[0]
|
sub_tbname = sub_tables[0]
|
||||||
# create stream with query above sub_table
|
# create stream with query above sub_table
|
||||||
stream_sql = 'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} where ts <now and ts >now -1h PARTITION BY tbname INTERVAL(5s) SLIDING(3s) FILL (prev) '.format(sub_stream_name,dbname,sub_stream_tb_name ,aggExpr,dbname,sub_tbname)
|
stream_sql = 'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(sub_stream_name,dbname,sub_stream_tb_name ,aggExpr,dbname,sub_tbname)
|
||||||
try:
|
try:
|
||||||
self.execWtSql(wt, stream_sql)
|
self.execWtSql(wt, stream_sql)
|
||||||
Logging.debug("[OPS] stream is creating at {}".format(time.time()))
|
Logging.debug("[OPS] stream is creating at {}".format(time.time()))
|
||||||
|
@ -1749,7 +1770,7 @@ class TaskCreateStream(StateTransitionTask):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
else:
|
else:
|
||||||
stream_sql = 'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} where ts <now and ts >now -1h PARTITION BY tbname INTERVAL(5s) SLIDING(3s) FILL (prev) '.format(super_stream_name,dbname,super_stream_tb_name,aggExpr, dbname,stbname)
|
stream_sql = 'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(super_stream_name,dbname,super_stream_tb_name,aggExpr, dbname,stbname)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.execWtSql(wt, stream_sql)
|
self.execWtSql(wt, stream_sql)
|
||||||
|
@ -1759,8 +1780,142 @@ class TaskCreateStream(StateTransitionTask):
|
||||||
if errno in [0x03f0]: # stream already exists
|
if errno in [0x03f0]: # stream already exists
|
||||||
# stream need drop before drop table
|
# stream need drop before drop table
|
||||||
pass
|
pass
|
||||||
|
'''
|
||||||
|
|
||||||
|
class TaskCreateTopic(StateTransitionTask):
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def getEndState(cls):
|
||||||
|
return StateHasData()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def canBeginFrom(cls, state: AnyState):
|
||||||
|
return state.canCreateTopic()
|
||||||
|
|
||||||
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
|
dbname = self._db.getName()
|
||||||
|
|
||||||
|
sub_topic_name = dbname+ '_sub_topic'
|
||||||
|
super_topic_name = dbname+ '_super_topic'
|
||||||
|
stable_topic = dbname+ '_stable_topic'
|
||||||
|
db_topic = 'database_' + dbname+ '_topics'
|
||||||
|
if not self._db.exists(wt.getDbConn()):
|
||||||
|
Logging.debug("Skipping task, no DB yet")
|
||||||
|
return
|
||||||
|
|
||||||
|
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
||||||
|
# wt.execSql("use db") # should always be in place
|
||||||
|
|
||||||
|
# create topic
|
||||||
|
|
||||||
|
# create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1;
|
||||||
|
|
||||||
|
stbname =sTable.getName()
|
||||||
|
sub_tables = sTable.getRegTables(wt.getDbConn())
|
||||||
|
scalarExpr = Dice.choice([ '*','speed','color',
|
||||||
|
'abs(speed)',
|
||||||
|
'acos(speed)',
|
||||||
|
'asin(speed)',
|
||||||
|
'atan(speed)',
|
||||||
|
'ceil(speed)',
|
||||||
|
'cos(speed)',
|
||||||
|
'cos(speed)',
|
||||||
|
'floor(speed)',
|
||||||
|
'log(speed,2)',
|
||||||
|
'pow(speed,2)',
|
||||||
|
'round(speed)',
|
||||||
|
'sin(speed)',
|
||||||
|
'sqrt(speed)',
|
||||||
|
'char_length(color)',
|
||||||
|
'concat(color,color)',
|
||||||
|
'concat_ws(" ", color,color," ")',
|
||||||
|
'length(color)',
|
||||||
|
'lower(color)',
|
||||||
|
'ltrim(color)',
|
||||||
|
'substr(color , 2)',
|
||||||
|
'upper(color)',
|
||||||
|
'cast(speed as double)',
|
||||||
|
'cast(ts as bigint)',
|
||||||
|
|
||||||
|
]) # TODO: add more from 'top'
|
||||||
|
if Dice.throw(3)==0:
|
||||||
|
|
||||||
|
if sub_tables:
|
||||||
|
|
||||||
|
if sub_tables: # if not empty
|
||||||
|
sub_tbname = sub_tables[0]
|
||||||
|
# create stream with query above sub_table
|
||||||
|
topic_sql = 'create topic {} as select {} FROM {}.{} ; '.format(sub_topic_name,scalarExpr,dbname,sub_tbname)
|
||||||
|
try:
|
||||||
|
self.execWtSql(wt, "use {}".format(dbname))
|
||||||
|
self.execWtSql(wt, topic_sql)
|
||||||
|
Logging.debug("[OPS] topic is creating at {}".format(time.time()))
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
errno = Helper.convertErrno(err.errno)
|
||||||
|
if errno in [0x03f0]: # topic already exists
|
||||||
|
# topic need drop before drop table
|
||||||
|
pass
|
||||||
|
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
else:
|
||||||
|
topic_sql = 'create topic {} as select {} FROM {}.{} '.format(super_topic_name,scalarExpr, dbname,stbname)
|
||||||
|
try:
|
||||||
|
self.execWtSql(wt, "use {}".format(dbname))
|
||||||
|
self.execWtSql(wt, topic_sql)
|
||||||
|
Logging.debug("[OPS] subquery topic is creating at {}".format(time.time()))
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
errno = Helper.convertErrno(err.errno)
|
||||||
|
if errno in [0x03f0]: # topic already exists
|
||||||
|
# topic need drop before drop table
|
||||||
|
pass
|
||||||
|
elif Dice.throw(3)==1:
|
||||||
|
topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic,dbname,stbname)
|
||||||
|
try:
|
||||||
|
self.execWtSql(wt, "use {}".format(dbname))
|
||||||
|
self.execWtSql(wt, topic_sql)
|
||||||
|
Logging.debug("[OPS] stable topic is creating at {}".format(time.time()))
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
errno = Helper.convertErrno(err.errno)
|
||||||
|
if errno in [0x03f0]: # topic already exists
|
||||||
|
# topic need drop before drop table
|
||||||
|
pass
|
||||||
|
elif Dice.throw(3)==2:
|
||||||
|
topic_sql = 'create topic {} AS DATABASE {} '.format(db_topic,dbname)
|
||||||
|
try:
|
||||||
|
self.execWtSql(wt, "use {}".format(dbname))
|
||||||
|
self.execWtSql(wt, topic_sql)
|
||||||
|
Logging.debug("[OPS] db topic is creating at {}".format(time.time()))
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
errno = Helper.convertErrno(err.errno)
|
||||||
|
if errno in [0x03f0]: # topic already exists
|
||||||
|
# topic need drop before drop table
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TaskCreateConsumers(StateTransitionTask):
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def getEndState(cls):
|
||||||
|
return StateHasData()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def canBeginFrom(cls, state: AnyState):
|
||||||
|
return state.canCreateConsumers()
|
||||||
|
|
||||||
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
|
dbname = self._db.getName()
|
||||||
|
|
||||||
|
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
||||||
|
# wt.execSql("use db") # should always be in place
|
||||||
|
|
||||||
|
# create Consumers
|
||||||
|
if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task
|
||||||
|
if sTable.hasTopics(wt.getDbConn()):
|
||||||
|
sTable.createConsumer(wt.getDbConn(),random.randint(1,10))
|
||||||
|
|
||||||
|
|
||||||
class TaskCreateSuperTable(StateTransitionTask):
|
class TaskCreateSuperTable(StateTransitionTask):
|
||||||
|
@ -1780,9 +1935,6 @@ class TaskCreateSuperTable(StateTransitionTask):
|
||||||
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
||||||
# wt.execSql("use db") # should always be in place
|
# wt.execSql("use db") # should always be in place
|
||||||
|
|
||||||
if sTable.hasStreams(wt.getDbConn()) or sTable.hasStreamTables(wt.getDbConn()):
|
|
||||||
sTable.dropStreams(wt.getDbConn())
|
|
||||||
sTable.dropStreamTables(wt.getDbConn())
|
|
||||||
sTable.create(wt.getDbConn(),
|
sTable.create(wt.getDbConn(),
|
||||||
{'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, {
|
{'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, {
|
||||||
'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT},
|
'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT},
|
||||||
|
@ -1797,6 +1949,8 @@ class TdSuperTable:
|
||||||
def __init__(self, stName, dbName):
|
def __init__(self, stName, dbName):
|
||||||
self._stName = stName
|
self._stName = stName
|
||||||
self._dbName = dbName
|
self._dbName = dbName
|
||||||
|
self._consumerLists = {}
|
||||||
|
self._ConsumerInsts = []
|
||||||
|
|
||||||
def getName(self):
|
def getName(self):
|
||||||
return self._stName
|
return self._stName
|
||||||
|
@ -1806,6 +1960,12 @@ class TdSuperTable:
|
||||||
dbName = self._dbName
|
dbName = self._dbName
|
||||||
if self.exists(dbc) : # if myself exists
|
if self.exists(dbc) : # if myself exists
|
||||||
fullTableName = dbName + '.' + self._stName
|
fullTableName = dbName + '.' + self._stName
|
||||||
|
if self.hasStreams(dbc):
|
||||||
|
self.dropStreams(dbc)
|
||||||
|
self.dropStreamTables(dbc)
|
||||||
|
if self.hasTopics(dbc):
|
||||||
|
self.dropTopics(dbName,None)
|
||||||
|
self.dropTopics(dbName,self._stName)
|
||||||
try:
|
try:
|
||||||
dbc.execute("DROP TABLE {}".format(fullTableName))
|
dbc.execute("DROP TABLE {}".format(fullTableName))
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
|
@ -1843,12 +2003,19 @@ class TdSuperTable:
|
||||||
|
|
||||||
if dbc.existsSuperTable(self._stName):
|
if dbc.existsSuperTable(self._stName):
|
||||||
if dropIfExists:
|
if dropIfExists:
|
||||||
|
if self.hasStreams(dbc):
|
||||||
|
self.dropStreams(dbc)
|
||||||
|
self.dropStreamTables(dbc)
|
||||||
|
|
||||||
|
# drop topics before drop stables
|
||||||
|
if self.hasTopics(dbc):
|
||||||
|
self.dropTopics(dbc,self._dbName,None)
|
||||||
|
self.dropTopics(dbc,self._dbName,self._stName )
|
||||||
|
|
||||||
|
|
||||||
dbc.execute("DROP TABLE {}".format(fullTableName))
|
dbc.execute("DROP TABLE {}".format(fullTableName))
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
# dbc.execute("DROP TABLE {}".format(fullTableName))
|
# dbc.execute("DROP TABLE {}".format(fullTableName))
|
||||||
else: # error
|
else: # error
|
||||||
raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
|
raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
|
||||||
|
@ -1865,6 +2032,46 @@ class TdSuperTable:
|
||||||
sql += " TAGS (dummy int) "
|
sql += " TAGS (dummy int) "
|
||||||
dbc.execute(sql)
|
dbc.execute(sql)
|
||||||
|
|
||||||
|
def createConsumer(self, dbc: DbConn , Consumer_nums):
|
||||||
|
|
||||||
|
def generateConsumer(current_topic_list):
|
||||||
|
conf = TaosTmqConf()
|
||||||
|
conf.set("group.id", "tg2")
|
||||||
|
conf.set("td.connect.user", "root")
|
||||||
|
conf.set("td.connect.pass", "taosdata")
|
||||||
|
conf.set("enable.auto.commit", "true")
|
||||||
|
def tmq_commit_cb_print(tmq, resp, offset, param=None):
|
||||||
|
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
|
||||||
|
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
|
||||||
|
consumer = conf.new_consumer()
|
||||||
|
topic_list = TaosTmqList()
|
||||||
|
for topic in current_topic_list:
|
||||||
|
topic_list.append(topic)
|
||||||
|
consumer.subscribe(topic_list)
|
||||||
|
time.sleep(5) # consumer work only 5 sec ,and then it will exit
|
||||||
|
try:
|
||||||
|
consumer.unsubscribe()
|
||||||
|
except Exception as e :
|
||||||
|
pass
|
||||||
|
return
|
||||||
|
|
||||||
|
# mulit Consumer
|
||||||
|
current_topic_list = self.getTopicLists(dbc)
|
||||||
|
for i in range(Consumer_nums):
|
||||||
|
consumer_inst = threading.Thread(target=generateConsumer, args=(current_topic_list,))
|
||||||
|
self._ConsumerInsts.append(consumer_inst)
|
||||||
|
|
||||||
|
for ConsumerInst in self._ConsumerInsts:
|
||||||
|
ConsumerInst.start()
|
||||||
|
for ConsumerInst in self._ConsumerInsts:
|
||||||
|
ConsumerInst.join()
|
||||||
|
|
||||||
|
def getTopicLists(self, dbc: DbConn):
|
||||||
|
dbc.query("show topics ")
|
||||||
|
topics = dbc.getQueryResult()
|
||||||
|
topicLists = [v[0] for v in topics]
|
||||||
|
return topicLists
|
||||||
|
|
||||||
def getRegTables(self, dbc: DbConn):
|
def getRegTables(self, dbc: DbConn):
|
||||||
dbName = self._dbName
|
dbName = self._dbName
|
||||||
try:
|
try:
|
||||||
|
@ -1878,12 +2085,7 @@ class TdSuperTable:
|
||||||
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
|
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
|
||||||
|
|
||||||
def hasRegTables(self, dbc: DbConn):
|
def hasRegTables(self, dbc: DbConn):
|
||||||
# print(self._stName)
|
|
||||||
# dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName))
|
|
||||||
# print(dbc.getQueryResult())
|
|
||||||
if self.hasStreamTables(dbc) or self.hasStreams(dbc):
|
|
||||||
if self.dropStreams(dbc):
|
|
||||||
self.dropStreamTables(dbc)
|
|
||||||
if dbc.existsSuperTable(self._stName):
|
if dbc.existsSuperTable(self._stName):
|
||||||
|
|
||||||
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
|
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
|
||||||
|
@ -1897,6 +2099,40 @@ class TdSuperTable:
|
||||||
def hasStreams(self,dbc: DbConn):
|
def hasStreams(self,dbc: DbConn):
|
||||||
return dbc.query("show streams") > 0
|
return dbc.query("show streams") > 0
|
||||||
|
|
||||||
|
def hasTopics(self,dbc: DbConn):
|
||||||
|
|
||||||
|
return dbc.query("show topics") > 0
|
||||||
|
|
||||||
|
def dropTopics(self,dbc: DbConn , dbname=None,stb_name=None):
|
||||||
|
dbc.query("show topics ")
|
||||||
|
topics = dbc.getQueryResult()
|
||||||
|
|
||||||
|
if dbname !=None and stb_name == None :
|
||||||
|
|
||||||
|
for topic in topics:
|
||||||
|
if dbname in topic[0] and topic[0].startswith("database"):
|
||||||
|
try:
|
||||||
|
dbc.execute('drop topic {}'.format(topic[0]))
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
errno = Helper.convertErrno(err.errno)
|
||||||
|
if errno in [0x03EB]: # Topic subscribed cannot be dropped
|
||||||
|
pass
|
||||||
|
# for subsript in subscriptions:
|
||||||
|
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
pass
|
||||||
|
return True
|
||||||
|
elif dbname !=None and stb_name!= None:
|
||||||
|
for topic in topics:
|
||||||
|
if topic[0].startswith(self._dbName) and topic[0].endswith('topic'):
|
||||||
|
dbc.execute('drop topic {}'.format(topic[0]))
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
pass
|
||||||
|
|
||||||
def dropStreams(self,dbc:DbConn):
|
def dropStreams(self,dbc:DbConn):
|
||||||
dbc.query("show streams ")
|
dbc.query("show streams ")
|
||||||
Streams = dbc.getQueryResult()
|
Streams = dbc.getQueryResult()
|
||||||
|
@ -1912,7 +2148,7 @@ class TdSuperTable:
|
||||||
StreamTables = dbc.getQueryResult()
|
StreamTables = dbc.getQueryResult()
|
||||||
|
|
||||||
for StreamTable in StreamTables:
|
for StreamTable in StreamTables:
|
||||||
if self.dropStreams:
|
if self.dropStreams(dbc):
|
||||||
dbc.execute('drop table {}.{}'.format(self._dbName,StreamTable[0]))
|
dbc.execute('drop table {}.{}'.format(self._dbName,StreamTable[0]))
|
||||||
|
|
||||||
return not dbc.query("show {}.stables like 'stream_tb%'".format(self._dbName))
|
return not dbc.query("show {}.stables like 'stream_tb%'".format(self._dbName))
|
||||||
|
@ -2085,10 +2321,6 @@ class TdSuperTable:
|
||||||
'STATEDURATION(speed,"LT",1)',
|
'STATEDURATION(speed,"LT",1)',
|
||||||
'twa(speed)'
|
'twa(speed)'
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
]) # TODO: add more from 'top'
|
]) # TODO: add more from 'top'
|
||||||
|
|
||||||
|
|
||||||
|
@ -2194,6 +2426,7 @@ class TaskDropSuperTable(StateTransitionTask):
|
||||||
for i in tblSeq:
|
for i in tblSeq:
|
||||||
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
|
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
|
||||||
try:
|
try:
|
||||||
|
|
||||||
self.execWtSql(wt, "drop table {}.{}".
|
self.execWtSql(wt, "drop table {}.{}".
|
||||||
format(self._db.getName(), regTableName)) # nRows always 0, like MySQL
|
format(self._db.getName(), regTableName)) # nRows always 0, like MySQL
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
|
@ -2209,6 +2442,17 @@ class TaskDropSuperTable(StateTransitionTask):
|
||||||
|
|
||||||
# Drop the super table itself
|
# Drop the super table itself
|
||||||
tblName = self._db.getFixedSuperTableName()
|
tblName = self._db.getFixedSuperTableName()
|
||||||
|
|
||||||
|
# drop streams before drop stables
|
||||||
|
if self._db.getFixedSuperTable().hasStreams(wt.getDbConn()):
|
||||||
|
self._db.getFixedSuperTable().dropStreams(wt.getDbConn())
|
||||||
|
self._db.getFixedSuperTable().dropStreamTables(wt.getDbConn())
|
||||||
|
|
||||||
|
# drop topics before drop stables
|
||||||
|
if self._db.getFixedSuperTable().hasTopics(wt.getDbConn()):
|
||||||
|
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),None)
|
||||||
|
self._db.getFixedSuperTable().dropTopics(wt.getDbConn(),self._db.getName(),tblName)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
|
self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
|
|
Loading…
Reference in New Issue