This commit is contained in:
wenzhouwww@live.cn 2022-11-08 10:54:57 +08:00
parent 8a5427c95b
commit 7aac106650
1 changed files with 87 additions and 62 deletions

View File

@ -1712,30 +1712,66 @@ class TaskCreateStream(StateTransitionTask):
''' '''
stbname =sTable.getName() stbname =sTable.getName()
sub_tables = sTable.getRegTables(wt.getDbConn()) sub_tables = sTable.getRegTables(wt.getDbConn())
aggExpr = Dice.choice([
'count(*)',
'avg(speed)',
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
'sum(speed)',
'stddev(speed)',
# SELECTOR functions
'min(speed)',
'max(speed)',
'first(speed)',
'last(speed)',
'top(speed, 50)', # TODO: not supported?
'bottom(speed, 50)', # TODO: not supported?
'apercentile(speed, 10)', # TODO: TD-1316
'last_row(*)', # TODO: commented out per TD-3231, we should re-create
# Transformation Functions
# 'diff(speed)', # TODO: no supported?!
'spread(speed)',
'elapsed(ts)',
'mode(speed)',
'bottom(speed,1)',
'top(speed,1)',
'tail(speed,1)',
'unique(color)',
'csum(speed)',
'DERIVATIVE(speed,1s,1)',
'diff(speed,1)',
'irate(speed)',
'mavg(speed,3)',
'sample(speed,5)',
'STATECOUNT(speed,"LT",1)',
'STATEDURATION(speed,"LT",1)',
'twa(speed)'
]) # TODO: add more from 'top'
if sub_tables: if sub_tables:
if sub_tables: # if not empty if sub_tables: # if not empty
sub_tbname = sub_tables[0] sub_tbname = sub_tables[0]
# create stream with query above sub_table # create stream with query above sub_table
stream_sql = 'create stream {} into {}.{} as select count(*), avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(sub_stream_name,dbname,sub_stream_tb_name ,dbname,sub_tbname) stream_sql = 'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(sub_stream_name,dbname,sub_stream_tb_name ,aggExpr,dbname,sub_tbname)
try: try:
self.execWtSql(wt, stream_sql) self.execWtSql(wt, stream_sql)
Logging.debug("[OPS] stream is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno in [0x03f0]: # stream already exists if errno in [0x03f0]: # stream already exists
# stream need drop before drop table # stream need drop before drop table
pass pass
sTable.setStreamName(sub_stream_name)
else: else:
pass pass
else: else:
stream_sql = 'create stream {} into {}.{} as select count(*), avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(super_stream_name,dbname,super_stream_tb_name,dbname,stbname) stream_sql = 'create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(super_stream_name,dbname,super_stream_tb_name,aggExpr, dbname,stbname)
try: try:
self.execWtSql(wt, stream_sql) self.execWtSql(wt, stream_sql)
Logging.debug("[OPS] stream is creating at {}".format(time.time()))
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno) errno = Helper.convertErrno(err.errno)
if errno in [0x03f0]: # stream already exists if errno in [0x03f0]: # stream already exists
@ -1779,16 +1815,10 @@ class TdSuperTable:
def __init__(self, stName, dbName): def __init__(self, stName, dbName):
self._stName = stName self._stName = stName
self._dbName = dbName self._dbName = dbName
self._streamName = []
def getName(self): def getName(self):
return self._stName return self._stName
def setStreamName(self,name):
self._streamName.append(name)
def getStreamName(self):
return self._streamName
def drop(self, dbc, skipCheck = False): def drop(self, dbc, skipCheck = False):
dbName = self._dbName dbName = self._dbName
@ -2000,38 +2030,38 @@ class TdSuperTable:
if not doAggr: # don't do aggregate query, just simple one if not doAggr: # don't do aggregate query, just simple one
commonExpr = Dice.choice([ commonExpr = Dice.choice([
'*', '*',
# 'abs(speed)', 'abs(speed)',
# 'acos(speed)', 'acos(speed)',
# 'asin(speed)', 'asin(speed)',
# 'atan(speed)', 'atan(speed)',
# 'ceil(speed)', 'ceil(speed)',
# 'cos(speed)', 'cos(speed)',
# 'cos(speed)', 'cos(speed)',
# 'floor(speed)', 'floor(speed)',
# 'log(speed,2)', 'log(speed,2)',
# 'pow(speed,2)', 'pow(speed,2)',
# 'round(speed)', 'round(speed)',
# 'sin(speed)', 'sin(speed)',
# 'sqrt(speed)', 'sqrt(speed)',
# 'char_length(color)', 'char_length(color)',
# 'concat(color,color)', 'concat(color,color)',
# 'concat_ws(" ", color,color," ")', 'concat_ws(" ", color,color," ")',
# 'length(color)', 'length(color)',
# 'lower(color)', 'lower(color)',
# 'ltrim(color)', 'ltrim(color)',
# 'substr(color , 2)', 'substr(color , 2)',
# 'upper(color)', 'upper(color)',
# 'cast(speed as double)', 'cast(speed as double)',
# 'cast(ts as bigint)', 'cast(ts as bigint)',
# # 'TO_ISO8601(color)', # 'TO_ISO8601(color)',
# # 'TO_UNIXTIMESTAMP(ts)', # 'TO_UNIXTIMESTAMP(ts)',
# 'now()', 'now()',
# 'timediff(ts,now)', 'timediff(ts,now)',
# 'timezone()', 'timezone()',
# 'TIMETRUNCATE(ts,1s)', 'TIMETRUNCATE(ts,1s)',
# 'TIMEZONE()', 'TIMEZONE()',
# 'TODAY()', 'TODAY()',
# 'distinct(color)' 'distinct(color)'
] ]
) )
ret.append(SqlQuery( # reg table ret.append(SqlQuery( # reg table
@ -2057,21 +2087,21 @@ class TdSuperTable:
# Transformation Functions # Transformation Functions
# 'diff(speed)', # TODO: no supported?! # 'diff(speed)', # TODO: no supported?!
'spread(speed)', 'spread(speed)',
# 'elapsed(ts)', 'elapsed(ts)',
# 'mode(speed)', 'mode(speed)',
# 'bottom(speed,1)', 'bottom(speed,1)',
# 'top(speed,1)', 'top(speed,1)',
# 'tail(speed,1)', 'tail(speed,1)',
# 'unique(color)', 'unique(color)',
# 'csum(speed)', 'csum(speed)',
# 'DERIVATIVE(speed,1s,1)', 'DERIVATIVE(speed,1s,1)',
# 'diff(speed,1)', 'diff(speed,1)',
# 'irate(speed)', 'irate(speed)',
# 'mavg(speed,3)', 'mavg(speed,3)',
# 'sample(speed,5)', 'sample(speed,5)',
# 'STATECOUNT(speed,"LT",1)', 'STATECOUNT(speed,"LT",1)',
# 'STATEDURATION(speed,"LT",1)', 'STATEDURATION(speed,"LT",1)',
# 'twa(speed)' 'twa(speed)'
@ -2181,7 +2211,6 @@ class TaskDropSuperTable(StateTransitionTask):
isSuccess = True isSuccess = True
for i in tblSeq: for i in tblSeq:
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i) regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
streams_prix = self._db.getName()
try: try:
self.execWtSql(wt, "drop table {}.{}". self.execWtSql(wt, "drop table {}.{}".
format(self._db.getName(), regTableName)) # nRows always 0, like MySQL format(self._db.getName(), regTableName)) # nRows always 0, like MySQL
@ -2189,10 +2218,6 @@ class TaskDropSuperTable(StateTransitionTask):
pass pass
if (not tickOutput): if (not tickOutput):
tickOutput = True # Print only one time tickOutput = True # Print only one time
if isSuccess: if isSuccess: