Added overlaping data insertion, plus occasional db connection drops
This commit is contained in:
parent
2e2a3f10e1
commit
ac1fb970d5
|
@ -32,6 +32,7 @@ import textwrap
|
||||||
|
|
||||||
from typing import List
|
from typing import List
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
|
from typing import Set
|
||||||
|
|
||||||
from util.log import *
|
from util.log import *
|
||||||
from util.dnodes import *
|
from util.dnodes import *
|
||||||
|
@ -1136,6 +1137,10 @@ class ReadFixedDataTask(StateTransitionTask):
|
||||||
sTbName = self._dbState.getFixedSuperTableName()
|
sTbName = self._dbState.getFixedSuperTableName()
|
||||||
dbc = wt.getDbConn()
|
dbc = wt.getDbConn()
|
||||||
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
|
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
|
||||||
|
if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
|
||||||
|
dbc.close()
|
||||||
|
dbc.open()
|
||||||
|
else:
|
||||||
rTables = dbc.getQueryResult()
|
rTables = dbc.getQueryResult()
|
||||||
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
|
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
|
||||||
for rTbName in rTables : # regular tables
|
for rTbName in rTables : # regular tables
|
||||||
|
@ -1160,6 +1165,8 @@ class DropFixedSuperTableTask(StateTransitionTask):
|
||||||
wt.execSql("drop table db.{}".format(tblName))
|
wt.execSql("drop table db.{}".format(tblName))
|
||||||
|
|
||||||
class AddFixedDataTask(StateTransitionTask):
|
class AddFixedDataTask(StateTransitionTask):
|
||||||
|
activeTable : Set[int] = set() # Track which table is being actively worked on
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def getInfo(cls):
|
def getInfo(cls):
|
||||||
return [
|
return [
|
||||||
|
@ -1174,14 +1181,24 @@ class AddFixedDataTask(StateTransitionTask):
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
ds = self._dbState
|
ds = self._dbState
|
||||||
wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
|
wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
|
||||||
for i in range(35 if gConfig.larger_data else 2): # number of regular tables in the super table
|
tblSeq = list(range(35 if gConfig.larger_data else 2))
|
||||||
for j in range(100 if gConfig.larger_data else 2) : # number of records per table
|
random.shuffle(tblSeq)
|
||||||
|
for i in tblSeq:
|
||||||
|
if ( i in self.activeTable ): # wow already active
|
||||||
|
# logger.info("Concurrent data insertion into table: {}".format(i))
|
||||||
|
# print("ct({})".format(i), end="", flush=True) # Concurrent insertion into table
|
||||||
|
print("x", end="", flush=True)
|
||||||
|
else:
|
||||||
|
self.activeTable.add(i) # marking it active
|
||||||
|
# No need to shuffle data sequence, unless later we decide to do non-increment insertion
|
||||||
|
for j in range(50 if gConfig.larger_data else 2) : # number of records per table
|
||||||
sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format(
|
sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format(
|
||||||
i,
|
i,
|
||||||
ds.getFixedSuperTableName(),
|
ds.getFixedSuperTableName(),
|
||||||
ds.getNextBinary(), ds.getNextFloat(),
|
ds.getNextBinary(), ds.getNextFloat(),
|
||||||
ds.getNextTick(), ds.getNextInt())
|
ds.getNextTick(), ds.getNextInt())
|
||||||
wt.execSql(sql)
|
wt.execSql(sql)
|
||||||
|
self.activeTable.discard(i) # not raising an error, unlike remove
|
||||||
|
|
||||||
|
|
||||||
#---------- Non State-Transition Related Tasks ----------#
|
#---------- Non State-Transition Related Tasks ----------#
|
||||||
|
|
Loading…
Reference in New Issue