update
This commit is contained in:
parent
30b1c576e1
commit
e1ca6a5e9c
|
@ -1905,18 +1905,23 @@ class TaskCreateConsumers(StateTransitionTask):
|
|||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canCreateConsumers()
|
||||
return state.canCreateConsumers()
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
dbname = self._db.getName()
|
||||
|
||||
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
||||
# wt.execSql("use db") # should always be in place
|
||||
|
||||
# create Consumers
|
||||
if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task
|
||||
if sTable.hasTopics(wt.getDbConn()):
|
||||
sTable.createConsumer(wt.getDbConn(),random.randint(1,10))
|
||||
if Config.getConfig().connector_type == 'native':
|
||||
dbname = self._db.getName()
|
||||
|
||||
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
||||
# wt.execSql("use db") # should always be in place
|
||||
|
||||
# create Consumers
|
||||
if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task
|
||||
if sTable.hasTopics(wt.getDbConn()):
|
||||
sTable.createConsumer(wt.getDbConn(),random.randint(1,10))
|
||||
else:
|
||||
print(" restful not support tmq consumers")
|
||||
return
|
||||
|
||||
|
||||
class TaskCreateSuperTable(StateTransitionTask):
|
||||
|
|
Loading…
Reference in New Issue