Minor refactoring of crash_gen tool, now doing multi-record insertion some times
This commit is contained in:
parent
7688a6ebc7
commit
0d57114b77
|
@ -388,9 +388,9 @@ class ThreadCoordinator:
|
|||
self._syncAtBarrier() # For now just cross the barrier
|
||||
Progress.emit(Progress.END_THREAD_STEP)
|
||||
except threading.BrokenBarrierError as err:
|
||||
Logging.info("Main loop aborted, caused by worker thread time-out")
|
||||
Logging.info("Main loop aborted, caused by worker thread(s) time-out")
|
||||
self._execStats.registerFailure("Aborted due to worker thread timeout")
|
||||
print("\n\nWorker Thread time-out detected, important thread info:")
|
||||
print("\n\nWorker Thread time-out detected, TAOS related threads are:")
|
||||
ts = ThreadStacks()
|
||||
ts.print(filterInternal=True)
|
||||
workerTimeout = True
|
||||
|
@ -1242,6 +1242,7 @@ class Task():
|
|||
0x0B, # Unable to establish connection, more details in TD-1648
|
||||
0x200, # invalid SQL, TODO: re-examine with TD-934
|
||||
0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776
|
||||
0x213, # "Disconnected from service", result of "kill connection ???"
|
||||
0x217, # "db not selected", client side defined error code
|
||||
# 0x218, # "Table does not exist" client side defined error code
|
||||
0x360, # Table already exists
|
||||
|
@ -1911,13 +1912,88 @@ class TaskAddData(StateTransitionTask):
|
|||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canAddData()
|
||||
|
||||
def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor):
|
||||
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
|
||||
fullTableName = db.getName() + '.' + regTableName
|
||||
|
||||
sql = "insert into {} values ".format(fullTableName)
|
||||
for j in range(numRecords): # number of records per table
|
||||
nextInt = db.getNextInt()
|
||||
nextTick = db.getNextTick()
|
||||
sql += "('{}', {});".format(nextTick, nextInt)
|
||||
dbc.execute(sql)
|
||||
|
||||
def _addData(self, db, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
|
||||
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
|
||||
|
||||
for j in range(numRecords): # number of records per table
|
||||
nextInt = db.getNextInt()
|
||||
nextTick = db.getNextTick()
|
||||
if gConfig.record_ops:
|
||||
self.prepToRecordOps()
|
||||
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
|
||||
self.fAddLogReady.flush()
|
||||
os.fsync(self.fAddLogReady)
|
||||
|
||||
# TODO: too ugly trying to lock the table reliably, refactor...
|
||||
fullTableName = db.getName() + '.' + regTableName
|
||||
if gConfig.verify_data:
|
||||
self.lockTable(fullTableName)
|
||||
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
|
||||
|
||||
try:
|
||||
sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {})
|
||||
fullTableName,
|
||||
# ds.getFixedSuperTableName(),
|
||||
# ds.getNextBinary(), ds.getNextFloat(),
|
||||
nextTick, nextInt)
|
||||
dbc.execute(sql)
|
||||
except: # Any exception at all
|
||||
if gConfig.verify_data:
|
||||
self.unlockTable(fullTableName)
|
||||
raise
|
||||
|
||||
# Now read it back and verify, we might encounter an error if table is dropped
|
||||
if gConfig.verify_data: # only if command line asks for it
|
||||
try:
|
||||
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
|
||||
format(db.getName(), regTableName, nextTick))
|
||||
if readBack != nextInt :
|
||||
raise taos.error.ProgrammingError(
|
||||
"Failed to read back same data, wrote: {}, read: {}"
|
||||
.format(nextInt, readBack), 0x999)
|
||||
except taos.error.ProgrammingError as err:
|
||||
errno = Helper.convertErrno(err.errno)
|
||||
if errno in [0x991, 0x992] : # not a single result
|
||||
raise taos.error.ProgrammingError(
|
||||
"Failed to read back same data for tick: {}, wrote: {}, read: {}"
|
||||
.format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"),
|
||||
errno)
|
||||
elif errno in [0x218, 0x362]: # table doesn't exist
|
||||
# do nothing
|
||||
dummy = 0
|
||||
else:
|
||||
# Re-throw otherwise
|
||||
raise
|
||||
finally:
|
||||
self.unlockTable(fullTableName) # Unlock the table no matter what
|
||||
|
||||
# Successfully wrote the data into the DB, let's record it somehow
|
||||
te.recordDataMark(nextInt)
|
||||
|
||||
if gConfig.record_ops:
|
||||
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
|
||||
self.fAddLogDone.flush()
|
||||
os.fsync(self.fAddLogDone)
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
|
||||
db = self._db
|
||||
dbc = wt.getDbConn()
|
||||
tblSeq = list(range(
|
||||
self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
|
||||
random.shuffle(tblSeq)
|
||||
numTables = self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES
|
||||
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
|
||||
tblSeq = list(range(numTables ))
|
||||
random.shuffle(tblSeq) # now we have random sequence
|
||||
for i in tblSeq:
|
||||
if (i in self.activeTable): # wow already active
|
||||
print("x", end="", flush=True) # concurrent insertion
|
||||
|
@ -1925,82 +2001,20 @@ class TaskAddData(StateTransitionTask):
|
|||
self.activeTable.add(i) # marking it active
|
||||
|
||||
sTable = db.getFixedSuperTable()
|
||||
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
|
||||
|
||||
|
||||
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
|
||||
fullTableName = db.getName() + '.' + regTableName
|
||||
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
|
||||
sTable.ensureTable(self, wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists
|
||||
# self._unlockTable(fullTableName)
|
||||
|
||||
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS): # number of records per table
|
||||
nextInt = db.getNextInt()
|
||||
nextTick = db.getNextTick()
|
||||
if gConfig.record_ops:
|
||||
self.prepToRecordOps()
|
||||
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
|
||||
self.fAddLogReady.flush()
|
||||
os.fsync(self.fAddLogReady)
|
||||
|
||||
# TODO: too ugly trying to lock the table reliably, refactor...
|
||||
fullTableName = db.getName() + '.' + regTableName
|
||||
if gConfig.verify_data:
|
||||
self.lockTable(fullTableName)
|
||||
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
|
||||
|
||||
try:
|
||||
sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {})
|
||||
fullTableName,
|
||||
# ds.getFixedSuperTableName(),
|
||||
# ds.getNextBinary(), ds.getNextFloat(),
|
||||
nextTick, nextInt)
|
||||
dbc.execute(sql)
|
||||
except: # Any exception at all
|
||||
if gConfig.verify_data:
|
||||
self.unlockTable(fullTableName)
|
||||
raise
|
||||
|
||||
# Now read it back and verify, we might encounter an error if table is dropped
|
||||
if gConfig.verify_data: # only if command line asks for it
|
||||
try:
|
||||
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
|
||||
format(db.getName(), regTableName, nextTick))
|
||||
if readBack != nextInt :
|
||||
raise taos.error.ProgrammingError(
|
||||
"Failed to read back same data, wrote: {}, read: {}"
|
||||
.format(nextInt, readBack), 0x999)
|
||||
except taos.error.ProgrammingError as err:
|
||||
errno = Helper.convertErrno(err.errno)
|
||||
if errno in [0x991, 0x992] : # not a single result
|
||||
raise taos.error.ProgrammingError(
|
||||
"Failed to read back same data for tick: {}, wrote: {}, read: {}"
|
||||
.format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"),
|
||||
errno)
|
||||
elif errno in [0x218, 0x362]: # table doesn't exist
|
||||
# do nothing
|
||||
dummy = 0
|
||||
else:
|
||||
# Re-throw otherwise
|
||||
raise
|
||||
finally:
|
||||
self.unlockTable(fullTableName) # Unlock the table no matter what
|
||||
|
||||
# Successfully wrote the data into the DB, let's record it somehow
|
||||
te.recordDataMark(nextInt)
|
||||
|
||||
if gConfig.record_ops:
|
||||
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
|
||||
self.fAddLogDone.flush()
|
||||
os.fsync(self.fAddLogDone)
|
||||
|
||||
|
||||
if Dice.throw(1) == 0: # 1 in 2 chance
|
||||
self._addData(db, dbc, regTableName, te)
|
||||
else:
|
||||
self._addDataInBatch(db, dbc, regTableName, te)
|
||||
|
||||
self.activeTable.discard(i) # not raising an error, unlike remove
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class ThreadStacks: # stack info for all threads
|
||||
def __init__(self):
|
||||
self._allStacks = {}
|
||||
|
@ -2022,13 +2036,14 @@ class ThreadStacks: # stack info for all threads
|
|||
'__init__']: # the thread that extracted the stack
|
||||
continue # ignore
|
||||
# Now print
|
||||
print("\n<----- Thread Info for ID: {}".format(thNid))
|
||||
print("\n<----- Thread Info for LWP/ID: {} (Execution stopped at Bottom Frame) <-----".format(thNid))
|
||||
stackFrame = 0
|
||||
for frame in stack:
|
||||
# print(frame)
|
||||
print("File {filename}, line {lineno}, in {name}".format(
|
||||
filename=frame.filename, lineno=frame.lineno, name=frame.name))
|
||||
print("[{sf}] File {filename}, line {lineno}, in {name}".format(
|
||||
sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name))
|
||||
print(" {}".format(frame.line))
|
||||
print("-----> End of Thread Info\n")
|
||||
print("-----> End of Thread Info ----->\n")
|
||||
|
||||
class ClientManager:
|
||||
def __init__(self):
|
||||
|
|
Loading…
Reference in New Issue