updaate
This commit is contained in:
parent
05f0ac62e7
commit
3e133a8661
|
@ -255,7 +255,7 @@ class WorkerThread:
|
||||||
|
|
||||||
|
|
||||||
class ThreadCoordinator:
|
class ThreadCoordinator:
|
||||||
WORKER_THREAD_TIMEOUT = 1200 # Normal: 120
|
WORKER_THREAD_TIMEOUT = 120 # Normal: 120
|
||||||
|
|
||||||
def __init__(self, pool: ThreadPool, dbManager: DbManager):
|
def __init__(self, pool: ThreadPool, dbManager: DbManager):
|
||||||
self._curStep = -1 # first step is 0
|
self._curStep = -1 # first step is 0
|
||||||
|
@ -1374,6 +1374,7 @@ class Task():
|
||||||
0x707, # Query not ready
|
0x707, # Query not ready
|
||||||
0x396, # Database in creating status
|
0x396, # Database in creating status
|
||||||
0x386, # Database in droping status
|
0x386, # Database in droping status
|
||||||
|
0x03E1, # failed on tmq_subscribe ,topic not exist
|
||||||
|
|
||||||
|
|
||||||
1000 # REST catch-all error
|
1000 # REST catch-all error
|
||||||
|
@ -1905,18 +1906,23 @@ class TaskCreateConsumers(StateTransitionTask):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def canBeginFrom(cls, state: AnyState):
|
def canBeginFrom(cls, state: AnyState):
|
||||||
return state.canCreateConsumers()
|
return state.canCreateConsumers()
|
||||||
|
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
dbname = self._db.getName()
|
|
||||||
|
|
||||||
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
|
||||||
# wt.execSql("use db") # should always be in place
|
|
||||||
|
|
||||||
# create Consumers
|
if Config.getConfig().connector_type == 'native':
|
||||||
if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task
|
dbname = self._db.getName()
|
||||||
if sTable.hasTopics(wt.getDbConn()):
|
|
||||||
sTable.createConsumer(wt.getDbConn(),random.randint(1,10))
|
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
||||||
|
# wt.execSql("use db") # should always be in place
|
||||||
|
|
||||||
|
# create Consumers
|
||||||
|
if Dice.throw(50)==0: # because subscribe is cost so much time , Reduce frequency of this task
|
||||||
|
if sTable.hasTopics(wt.getDbConn()):
|
||||||
|
sTable.createConsumer(wt.getDbConn(),random.randint(1,10))
|
||||||
|
else:
|
||||||
|
print(" restful not support tmq consumers")
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
class TaskCreateSuperTable(StateTransitionTask):
|
class TaskCreateSuperTable(StateTransitionTask):
|
||||||
|
@ -2048,7 +2054,10 @@ class TdSuperTable:
|
||||||
topic_list = TaosTmqList()
|
topic_list = TaosTmqList()
|
||||||
for topic in current_topic_list:
|
for topic in current_topic_list:
|
||||||
topic_list.append(topic)
|
topic_list.append(topic)
|
||||||
consumer.subscribe(topic_list)
|
try:
|
||||||
|
consumer.subscribe(topic_list)
|
||||||
|
except TmqError as e :
|
||||||
|
pass
|
||||||
time.sleep(5) # consumer work only 5 sec ,and then it will exit
|
time.sleep(5) # consumer work only 5 sec ,and then it will exit
|
||||||
try:
|
try:
|
||||||
consumer.unsubscribe()
|
consumer.unsubscribe()
|
||||||
|
@ -3326,4 +3335,3 @@ class Container():
|
||||||
return
|
return
|
||||||
self._verifyValidProperty(name)
|
self._verifyValidProperty(name)
|
||||||
self._cargo[name] = value
|
self._cargo[name] = value
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue