Added -w option to crash_gen tool to write duplicate data to shadow database
This commit is contained in:
parent
a25af67190
commit
bbf7b47e29
|
@ -41,10 +41,13 @@ import gc
|
|||
from crash_gen.service_manager import ServiceManager, TdeInstance
|
||||
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
|
||||
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
|
||||
import crash_gen.settings
|
||||
|
||||
import taos
|
||||
import requests
|
||||
|
||||
crash_gen.settings.init()
|
||||
|
||||
# Require Python 3
|
||||
if sys.version_info[0] < 3:
|
||||
raise Exception("Must be using Python 3")
|
||||
|
@ -259,6 +262,7 @@ class ThreadCoordinator:
|
|||
self._execStats = ExecutionStats()
|
||||
self._runStatus = Status.STATUS_RUNNING
|
||||
self._initDbs()
|
||||
self._stepStartTime = None # Track how long it takes to execute each step
|
||||
|
||||
def getTaskExecutor(self):
|
||||
return self._te
|
||||
|
@ -394,6 +398,10 @@ class ThreadCoordinator:
|
|||
try:
|
||||
self._syncAtBarrier() # For now just cross the barrier
|
||||
Progress.emit(Progress.END_THREAD_STEP)
|
||||
if self._stepStartTime :
|
||||
stepExecTime = time.time() - self._stepStartTime
|
||||
Progress.emitStr('{:.3f}s/{}'.format(stepExecTime, DbConnNative.totalRequests))
|
||||
DbConnNative.resetTotalRequests() # reset to zero
|
||||
except threading.BrokenBarrierError as err:
|
||||
self._execStats.registerFailure("Aborted due to worker thread timeout")
|
||||
Logging.error("\n")
|
||||
|
@ -433,6 +441,7 @@ class ThreadCoordinator:
|
|||
|
||||
# Then we move on to the next step
|
||||
Progress.emit(Progress.BEGIN_THREAD_STEP)
|
||||
self._stepStartTime = time.time()
|
||||
self._releaseAllWorkerThreads(transitionFailed)
|
||||
|
||||
if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
|
||||
|
@ -691,7 +700,7 @@ class AnyState:
|
|||
def canDropDb(self):
|
||||
# If user requests to run up to a number of DBs,
|
||||
# we'd then not do drop_db operations any more
|
||||
if gConfig.max_dbs > 0 :
|
||||
if gConfig.max_dbs > 0 or gConfig.use_shadow_db :
|
||||
return False
|
||||
return self._info[self.CAN_DROP_DB]
|
||||
|
||||
|
@ -699,6 +708,8 @@ class AnyState:
|
|||
return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
|
||||
|
||||
def canDropFixedSuperTable(self):
|
||||
if gConfig.use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
|
||||
return False
|
||||
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
|
||||
|
||||
def canAddData(self):
|
||||
|
@ -1037,7 +1048,7 @@ class Database:
|
|||
_clsLock = threading.Lock() # class wide lock
|
||||
_lastInt = 101 # next one is initial integer
|
||||
_lastTick = 0
|
||||
_lastLaggingTick = 0 # lagging tick, for unsequenced insersions
|
||||
_lastLaggingTick = 0 # lagging tick, for out-of-sequence (oos) data insertions
|
||||
|
||||
def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
|
||||
self._dbNum = dbNum # we assign a number to databases, for our testing purpose
|
||||
|
@ -1093,21 +1104,24 @@ class Database:
|
|||
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
|
||||
t4 = datetime.datetime.fromtimestamp(
|
||||
t3.timestamp() + elSec2) # see explanation above
|
||||
Logging.debug("Setting up TICKS to start from: {}".format(t4))
|
||||
Logging.info("Setting up TICKS to start from: {}".format(t4))
|
||||
return t4
|
||||
|
||||
@classmethod
|
||||
def getNextTick(cls):
|
||||
'''
|
||||
Fetch a timestamp tick, with some random factor, may not be unique.
|
||||
'''
|
||||
with cls._clsLock: # prevent duplicate tick
|
||||
if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized
|
||||
# 10k at 1/20 chance, should be enough to avoid overlaps
|
||||
tick = cls.setupLastTick()
|
||||
cls._lastTick = tick
|
||||
cls._lastLaggingTick = tick + datetime.timedelta(0, -10000)
|
||||
cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast
|
||||
# if : # should be quite a bit into the future
|
||||
|
||||
if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick
|
||||
cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds
|
||||
if gConfig.mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
|
||||
cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
|
||||
return cls._lastLaggingTick
|
||||
else: # regular
|
||||
# add one second to it
|
||||
|
@ -1334,7 +1348,8 @@ class Task():
|
|||
elif self._isErrAcceptable(errno2, err.__str__()):
|
||||
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
|
||||
errno2, err, wt.getDbConn().getLastSql()))
|
||||
print("_", end="", flush=True)
|
||||
# print("_", end="", flush=True)
|
||||
Progress.emit(Progress.ACCEPTABLE_ERROR)
|
||||
self._err = err
|
||||
else: # not an acceptable error
|
||||
errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
|
||||
|
@ -1563,8 +1578,11 @@ class TaskCreateDb(StateTransitionTask):
|
|||
# numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
|
||||
numReplica = gConfig.max_replicas # fixed, always
|
||||
repStr = "replica {}".format(numReplica)
|
||||
self.execWtSql(wt, "create database {} {}"
|
||||
.format(self._db.getName(), repStr) )
|
||||
updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active
|
||||
dbName = self._db.getName()
|
||||
self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
|
||||
if dbName == "db_0" and gConfig.use_shadow_db:
|
||||
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
|
||||
|
||||
class TaskDropDb(StateTransitionTask):
|
||||
@classmethod
|
||||
|
@ -1988,7 +2006,7 @@ class TaskAddData(StateTransitionTask):
|
|||
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
|
||||
fullTableName = db.getName() + '.' + regTableName
|
||||
|
||||
sql = "insert into {} values ".format(fullTableName)
|
||||
sql = "INSERT INTO {} VALUES ".format(fullTableName)
|
||||
for j in range(numRecords): # number of records per table
|
||||
nextInt = db.getNextInt()
|
||||
nextTick = db.getNextTick()
|
||||
|
@ -2016,12 +2034,24 @@ class TaskAddData(StateTransitionTask):
|
|||
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
|
||||
|
||||
try:
|
||||
sql = "insert into {} values ('{}', {}, '{}');".format( # removed: tags ('{}', {})
|
||||
sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
|
||||
fullTableName,
|
||||
# ds.getFixedSuperTableName(),
|
||||
# ds.getNextBinary(), ds.getNextFloat(),
|
||||
nextTick, nextInt, nextColor)
|
||||
dbc.execute(sql)
|
||||
|
||||
# Quick hack, attach an update statement here. TODO: create an "update" task
|
||||
if (not gConfig.use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
|
||||
nextInt = db.getNextInt()
|
||||
nextColor = db.getNextColor()
|
||||
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
|
||||
fullTableName,
|
||||
nextTick, nextInt, nextColor)
|
||||
# sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format(
|
||||
# fullTableName, db.getNextInt(), db.getNextColor(), nextTick)
|
||||
dbc.execute(sql)
|
||||
|
||||
except: # Any exception at all
|
||||
if gConfig.verify_data:
|
||||
self.unlockTable(fullTableName)
|
||||
|
@ -2070,7 +2100,8 @@ class TaskAddData(StateTransitionTask):
|
|||
random.shuffle(tblSeq) # now we have random sequence
|
||||
for i in tblSeq:
|
||||
if (i in self.activeTable): # wow already active
|
||||
print("x", end="", flush=True) # concurrent insertion
|
||||
# print("x", end="", flush=True) # concurrent insertion
|
||||
Progress.emit(Progress.CONCURRENT_INSERTION)
|
||||
else:
|
||||
self.activeTable.add(i) # marking it active
|
||||
|
||||
|
@ -2373,6 +2404,11 @@ class MainExec:
|
|||
'--larger-data',
|
||||
action='store_true',
|
||||
help='Write larger amount of data during write operations (default: false)')
|
||||
parser.add_argument(
|
||||
'-m',
|
||||
'--mix-oos-data',
|
||||
action='store_false',
|
||||
help='Mix out-of-sequence data into the test data stream (default: true)')
|
||||
parser.add_argument(
|
||||
'-n',
|
||||
'--dynamic-db-table-names',
|
||||
|
@ -2414,6 +2450,11 @@ class MainExec:
|
|||
'--verify-data',
|
||||
action='store_true',
|
||||
help='Verify data written in a number of places by reading back (default: false)')
|
||||
parser.add_argument(
|
||||
'-w',
|
||||
'--use-shadow-db',
|
||||
action='store_true',
|
||||
help='Use a shaddow database to verify data integrity (default: false)')
|
||||
parser.add_argument(
|
||||
'-x',
|
||||
'--continue-on-exception',
|
||||
|
@ -2422,6 +2463,11 @@ class MainExec:
|
|||
|
||||
global gConfig
|
||||
gConfig = parser.parse_args()
|
||||
crash_gen.settings.gConfig = gConfig # TODO: fix this hack, consolidate this global var
|
||||
|
||||
# Sanity check for arguments
|
||||
if gConfig.use_shadow_db and gConfig.max_dbs>1 :
|
||||
raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1")
|
||||
|
||||
Logging.clsInit(gConfig)
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@ import datetime
|
|||
import traceback
|
||||
# from .service_manager import TdeInstance
|
||||
|
||||
import crash_gen.settings
|
||||
|
||||
class DbConn:
|
||||
TYPE_NATIVE = "native-c"
|
||||
TYPE_REST = "rest-api"
|
||||
|
@ -257,6 +259,27 @@ class MyTDSql:
|
|||
cls.longestQuery = sql
|
||||
cls.longestQueryTime = queryTime
|
||||
cls.lqStartTime = startTime
|
||||
|
||||
# Now write to the shadow database
|
||||
if crash_gen.settings.gConfig.use_shadow_db:
|
||||
if sql[:11] == "INSERT INTO":
|
||||
if sql[:16] == "INSERT INTO db_0":
|
||||
sql2 = "INSERT INTO db_s" + sql[16:]
|
||||
self._cursor.execute(sql2)
|
||||
else:
|
||||
raise CrashGenError("Did not find db_0 in INSERT statement: {}".format(sql))
|
||||
else: # not an insert statement
|
||||
pass
|
||||
|
||||
if sql[:12] == "CREATE TABLE":
|
||||
if sql[:17] == "CREATE TABLE db_0":
|
||||
sql2 = sql.replace('db_0', 'db_s')
|
||||
self._cursor.execute(sql2)
|
||||
else:
|
||||
raise CrashGenError("Did not find db_0 in CREATE TABLE statement: {}".format(sql))
|
||||
else: # not an insert statement
|
||||
pass
|
||||
|
||||
return ret
|
||||
|
||||
def query(self, sql):
|
||||
|
@ -302,6 +325,7 @@ class DbConnNative(DbConn):
|
|||
_lock = threading.Lock()
|
||||
# _connInfoDisplayed = False # TODO: find another way to display this
|
||||
totalConnections = 0 # Not private
|
||||
totalRequests = 0
|
||||
|
||||
def __init__(self, dbTarget):
|
||||
super().__init__(dbTarget)
|
||||
|
@ -309,6 +333,11 @@ class DbConnNative(DbConn):
|
|||
self._conn = None
|
||||
# self._cursor = None
|
||||
|
||||
@classmethod
|
||||
def resetTotalRequests(cls):
|
||||
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
|
||||
cls.totalRequests = 0
|
||||
|
||||
def openByType(self): # Open connection
|
||||
# global gContainer
|
||||
# tInst = tInst or gContainer.defTdeInstance # set up in ClientManager, type: TdeInstance
|
||||
|
@ -356,6 +385,8 @@ class DbConnNative(DbConn):
|
|||
Logging.debug("[SQL] Executing SQL: {}".format(sql))
|
||||
self._lastSql = sql
|
||||
nRows = self._tdSql.execute(sql)
|
||||
cls = self.__class__
|
||||
cls.totalRequests += 1
|
||||
Logging.debug(
|
||||
"[SQL] Execution Result, nRows = {}, SQL = {}".format(
|
||||
nRows, sql))
|
||||
|
@ -369,6 +400,8 @@ class DbConnNative(DbConn):
|
|||
Logging.debug("[SQL] Executing SQL: {}".format(sql))
|
||||
self._lastSql = sql
|
||||
nRows = self._tdSql.query(sql)
|
||||
cls = self.__class__
|
||||
cls.totalRequests += 1
|
||||
Logging.debug(
|
||||
"[SQL] Query Result, nRows = {}, SQL = {}".format(
|
||||
nRows, sql))
|
||||
|
|
|
@ -176,6 +176,8 @@ class Progress:
|
|||
SERVICE_START_NAP = 7
|
||||
CREATE_TABLE_ATTEMPT = 8
|
||||
QUERY_GROUP_BY = 9
|
||||
CONCURRENT_INSERTION = 10
|
||||
ACCEPTABLE_ERROR = 11
|
||||
|
||||
tokens = {
|
||||
STEP_BOUNDARY: '.',
|
||||
|
@ -188,8 +190,14 @@ class Progress:
|
|||
SERVICE_START_NAP: '_zz',
|
||||
CREATE_TABLE_ATTEMPT: 'c',
|
||||
QUERY_GROUP_BY: 'g',
|
||||
CONCURRENT_INSERTION: 'x',
|
||||
ACCEPTABLE_ERROR: '_',
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def emit(cls, token):
|
||||
print(cls.tokens[token], end="", flush=True)
|
||||
|
||||
@classmethod
|
||||
def emitStr(cls, str):
|
||||
print('({})'.format(str), end="", flush=True)
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
from __future__ import annotations
|
||||
import argparse
|
||||
|
||||
gConfig: argparse.Namespace
|
||||
|
||||
def init():
|
||||
global gConfig
|
||||
gConfig = []
|
Loading…
Reference in New Issue