From 6768ae81a68b0bc4b1c5829f557efc29fc9d93dc Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 28 Apr 2020 00:12:53 -0700 Subject: [PATCH] Now each WorkerThread provides its own SQL execution, but still using a shared db connection --- .../python/linux/python3/taos/cursor.py | 9 + tests/pytest/random_walk.py | 245 ++++++++++-------- 2 files changed, 141 insertions(+), 113 deletions(-) diff --git a/src/connector/python/linux/python3/taos/cursor.py b/src/connector/python/linux/python3/taos/cursor.py index 7a0ae767bc..eee03770ca 100644 --- a/src/connector/python/linux/python3/taos/cursor.py +++ b/src/connector/python/linux/python3/taos/cursor.py @@ -1,6 +1,8 @@ from .cinterface import CTaosInterface from .error import * +querySeqNum = 0 + class TDengineCursor(object): """Database cursor which is used to manage the context of a fetch operation. @@ -109,7 +111,14 @@ class TDengineCursor(object): if params is not None: pass + + # global querySeqNum + # querySeqNum += 1 + # localSeqNum = querySeqNum # avoid raice condition + # print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt))) res = CTaosInterface.query(self._connection._conn, stmt) + # print(" << Query ({}) Exec Done".format(localSeqNum)) + if res == 0: if CTaosInterface.fieldsCount(self._connection._conn) == 0: self._affected_rows += CTaosInterface.affectedRows(self._connection._conn) diff --git a/tests/pytest/random_walk.py b/tests/pytest/random_walk.py index 6c4b5c8eec..5125d28ec0 100755 --- a/tests/pytest/random_walk.py +++ b/tests/pytest/random_walk.py @@ -62,26 +62,35 @@ def runThread(workerThread): class WorkerThread: - def __init__(self, pool, tid): # note: main thread context! + def __init__(self, pool, tid, dbState): # note: main thread context! self.curStep = -1 self.pool = pool self.tid = tid + self.dbState = dbState # self.threadIdent = threading.get_ident() self.thread = threading.Thread(target=runThread, args=(self,)) self.stepGate = threading.Event() + # Let us have a DB connection of our own + self._dbConn = DbConn() + + def start(self): self.thread.start() # AFTER the thread is recorded def run(self): # initialization after thread starts, in the thread context # self.isSleeping = False + self._dbConn.open() while self.curStep < self.pool.maxSteps: # stepNo = self.pool.waitForStep() # Step to run self.crossStepGate() # self.curStep will get incremented self.doWork() + # clean up + self._dbConn.close() + def verifyThreadSelf(self): # ensure we are called by this own thread if ( threading.get_ident() != self.thread.ident ): raise RuntimeError("Unexpectly called from other threads") @@ -136,7 +145,10 @@ class WorkerThread: def doWork(self): logger.info(" Step {}, thread {}: ".format(self.curStep, self.tid)) - self.pool.dispatcher.doWork() + self.pool.dispatcher.doWork(self) + + def execSql(self, sql): + return self.dbState.execSql(sql) # We define a class to run a number of threads in locking steps. @@ -158,10 +170,9 @@ class SteppingThreadPool: # self.mainGate = threading.Condition() # starting to run all the threads, in locking steps - def run(self): - # Create the threads - for tid in range(0, self.numThreads): - workerThread = WorkerThread(self, tid) + def run(self): + for tid in range(0, self.numThreads): # Create the threads + workerThread = WorkerThread(self, tid, dbState) self.threadList.append(workerThread) workerThread.start() # start, but should block immediately before step 0 @@ -169,15 +180,10 @@ class SteppingThreadPool: self.curStep = -1 # not started yet while(self.curStep < self.maxSteps): logger.debug("Main thread going to sleep") - # self.mainGate.acquire() - # self.mainGate.wait() # start snoozing - # self.mainGate.release self.crossPoolBarrier() - self.barrier.reset() # Other worker threads should now be at the "gate" - + self.barrier.reset() # Other worker threads should now be at the "gate" + logger.debug("Main thread waking up, tapping worker threads".format(self.curStep)) # Now not all threads had time to go to sleep - # time.sleep(0.01) # This is like forever - self.tapAllThreads() # The threads will run through many steps @@ -192,49 +198,8 @@ class SteppingThreadPool: self.curStep += 1 # we are about to get into next step. TODO: race condition here! logger.debug(" ") # line break logger.debug("--> Step {} starts with main thread waking up".format(self.curStep)) # Now not all threads had time to go to sleep - self.barrier.wait() - # allThreadWaiting = False - # with self.lock: - # self.numWaitingThreads += 1 - # if ( self.numWaitingThreads == self.numThreads ): - # allThreadWaiting = True - - # if (allThreadWaiting): # aha, pass the baton to the main thread - # logger.debug("All threads are now waiting") - # self.numWaitingThreads = 0 # do this 1st to avoid race condition - # # time.sleep(0.001) # thread yield, so main thread can be ready - # self.mainGate.acquire() - # self.mainGate.notify() # main thread would now start to run - # self.mainGate.release() - # time.sleep(0) # yield, maybe main thread can run for just a bit - - # def waitForStep(self): - # shouldWait = True; - # with self.lock: - # # if ( self.numWaitingThreads == 0 ): # first one here - # # self.stepGate.acquire() # acquire the underlying lock - - # self.numWaitingThreads += 1 - # # if ( self.numWaitingThreads < self.numThreads ): - # # do nothing, we should wait - # if ( self.numWaitingThreads == self.numThreads ): - # shouldWait = False # we should now wake up - # elif ( self.numWaitingThreads > self.numThreads ): - # raise RuntimeError("Corrupt state") - - # self.stepGate.acquire() - # if (shouldWait): - # self.stepGate.wait() - # else: - # self.numWaitingThreads = 0 # fresh start - # self.curStep += 1 # do this before letting all threads loose - # print("--> Starting step {}".format(self.curStep), end="\r\n") # before notify_all - # # self.stepGate.notify_all() - # self.wakeUpAll() - # self.stepGate.release() - # return self.curStep def tapAllThreads(self): # in a deterministic manner wakeSeq = [] @@ -254,23 +219,32 @@ class LinearQueue(): def __init__(self): self.firstIndex = 1 # 1st ever element self.lastIndex = 0 - self.lock = threading.RLock() # our functions may call each other + self._lock = threading.RLock() # our functions may call each other self.inUse = set() # the indexes that are in use right now - def push(self): # Push to the tail (largest) - with self.lock: - if ( self.firstIndex > self.lastIndex ): # impossible, meaning it's empty - self.lastIndex = self.firstIndex - return self.firstIndex + def toText(self): + return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse) + + # Push (add new element, largest) to the tail, and mark it in use + def push(self): + with self._lock: + # if ( self.isEmpty() ): + # self.lastIndex = self.firstIndex + # return self.firstIndex # Otherwise we have something self.lastIndex += 1 + self.allocate(self.lastIndex) + # self.inUse.add(self.lastIndex) # mark it in use immediately return self.lastIndex def pop(self): - with self.lock: + with self._lock: if ( self.isEmpty() ): raise RuntimeError("Cannot pop an empty queue") index = self.firstIndex + if ( index in self.inUse ): + self.inUse.remove(index) # TODO: what about discard? + self.firstIndex += 1 return index @@ -278,113 +252,155 @@ class LinearQueue(): return self.firstIndex > self.lastIndex def popIfNotEmpty(self): - with self.lock: + with self._lock: if (self.isEmpty()): return 0 return self.pop() def allocate(self, i): - with self.lock: + with self._lock: if ( i in self.inUse ): raise RuntimeError("Cannot re-use same index in queue: {}".format(i)) self.inUse.add(i) def release(self, i): - with self.lock: + with self._lock: self.inUse.remove(i) # KeyError possible def size(self): return self.lastIndex + 1 - self.firstIndex def pickAndAllocate(self): - with self.lock: + if ( self.isEmpty() ): + return None + with self._lock: cnt = 0 # counting the interations while True: cnt += 1 if ( cnt > self.size()*10 ): # 10x iteration already - raise RuntimeError("Failed to allocate LinearQueue element") + # raise RuntimeError("Failed to allocate LinearQueue element") + return None ret = Dice.throwRange(self.firstIndex, self.lastIndex+1) if ( not ret in self.inUse ): - return self.allocate(ret) + self.allocate(ret) + return ret +class DbConn: + def __init__(self): + self.isOpen = False + + def open(self): # Open connection + if ( self.isOpen ): + raise RuntimeError("Cannot re-open an existing DB connection") + + cfgPath = "../../build/test/cfg" + conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable + + self._tdSql = TDSql() + self._tdSql.init(conn.cursor()) + self.isOpen = True + + def resetDb(self): # reset the whole database, etc. + if ( not self.isOpen ): + raise RuntimeError("Cannot reset database until connection is open") + self._tdSql.prepare() # Recreate database, etc. + # tdSql.execute('show databases') + + def close(self): + if ( not self.isOpen ): + raise RuntimeError("Cannot clean up database until connection is open") + self._tdSql.close() + self.isOpen = False + + def execSql(self, sql): + return self._tdSql.execute(sql) # State of the database as we believe it to be class DbState(): def __init__(self): self.tableNumQueue = LinearQueue() - self.tick = datetime.datetime(2019, 1, 1) # initial date time tick - self.int = 0 # initial integer - self.openDbServerConnection() - self.lock = threading.RLock() + self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick + self._lastInt = 0 # next one is initial integer + self._lock = threading.RLock() + + # self.openDbServerConnection() + self._dbConn = DbConn() + self._dbConn.open() + self._dbConn.resetDb() # drop and recreate DB def pickAndAllocateTable(self): # pick any table, and "use" it return self.tableNumQueue.pickAndAllocate() + def addTable(self): + with self._lock: + tIndex = self.tableNumQueue.push() + return tIndex + def releaseTable(self, i): # return the table back, so others can use it self.tableNumQueue.release(i) - def getNextTick(self): - with self.lock: # prevent duplicate tick - self.tick += datetime.timedelta(0, 1) # add one second to it - return self.tick + with self._lock: # prevent duplicate tick + self._lastTick += datetime.timedelta(0, 1) # add one second to it + return self._lastTick def getNextInt(self): - with self.lock: - self.int += 1 - return self.int - - def openDbServerConnection(self): - cfgPath = "../../build/test/cfg" # was: tdDnodes.getSimCfgPath() - conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable - - tdSql.init(conn.cursor()) - tdSql.prepare() # Recreate database, etc. - # tdSql.execute('show databases') - - def closeDbServerConnection(self): - tdSql.close() - tdLog.info("Disconnecting from database server") - - def getTableNameToCreate(self): - tblNum = self.tableNumQueue.push() - return "table_{}".format(tblNum) - + with self._lock: + self._lastInt += 1 + return self._lastInt + def getTableNameToDelete(self): if self.tableNumQueue.isEmpty(): return False tblNum = self.tableNumQueue.pop() # TODO: race condition! return "table_{}".format(tblNum) + def execSql(self, sql): # using the main DB connection + return self._dbConn.execSql(sql) + + def cleanUp(self): + self._dbConn.close() + class Task(): def __init__(self, dbState): self.dbState = dbState - def execute(self): + def execute(self, workerThread): raise RuntimeError("Must be overriden by child class") + def execSql(self, sql): + return self.dbState.execute(sql) + class CreateTableTask(Task): - def execute(self): - tableName = dbState.getTableNameToCreate() - logger.info(" Creating a table {} ...".format(tableName)) - tdSql.execute("create table {} (ts timestamp, speed int)".format(tableName)) + def execute(self, wt): + tIndex = dbState.addTable() + logger.debug(" Creating a table {} ...".format(tIndex)) + wt.execSql("create table table_{} (ts timestamp, speed int)".format(tIndex)) + logger.debug(" Table {} created.".format(tIndex)) + dbState.releaseTable(tIndex) class DropTableTask(Task): - def execute(self): + def execute(self, wt): tableName = dbState.getTableNameToDelete() if ( not tableName ): # May be "False" - logger.info("Cannot generate a table to delete, skipping...") + logger.info(" Cannot generate a table to delete, skipping...") return logger.info(" Dropping a table {} ...".format(tableName)) - tdSql.execute("drop table {}".format(tableName)) + wt.execSql("drop table {}".format(tableName)) class AddDataTask(Task): - def execute(self): - logger.info(" Adding some data...") + def execute(self, wt): ds = self.dbState - tIndex = ds.pickTable() - tdSql.execute("insert into table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())) - ds.r + logger.info(" Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) + tIndex = ds.pickAndAllocateTable() + if ( tIndex == None ): + logger.info(" No table found to add data, skipping...") + return + sql = "insert into table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) + logger.debug(" Executing SQL: {}".format(sql)) + wt.execSql(sql) + ds.releaseTable(tIndex) + logger.debug(" Finished adding data") # Deterministic random number generator class Dice(): @@ -430,12 +446,15 @@ class WorkDispatcher(): ] def throwDice(self): - return random.randint(0, len(self.tasks) - 1) + max = len(self.tasks) - 1 + dRes = random.randint(0, max) + # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) + return dRes - def doWork(self): + def doWork(self, workerThread): dice = self.throwDice() task = self.tasks[dice] - task.execute() + task.execute(workerThread) if __name__ == "__main__": logger = logging.getLogger('myApp') @@ -445,8 +464,8 @@ if __name__ == "__main__": Dice.seed(0) # initial seeding of dice dbState = DbState() - threadPool = SteppingThreadPool(dbState, 1, 5, 0) + threadPool = SteppingThreadPool(dbState, 3, 5, 0) threadPool.run() logger.info("Finished running thread pool") - dbState.closeDbServerConnection() + dbState.cleanUp()