From dad9f027e3c4c00ae7b53c72501cf5af8641b8f4 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Wed, 22 Apr 2020 15:46:54 -0700 Subject: [PATCH 01/17] minor tweak --- tests/pytest/simpletest_no_sudo.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/pytest/simpletest_no_sudo.sh b/tests/pytest/simpletest_no_sudo.sh index 61faf3df52..135a7d8814 100755 --- a/tests/pytest/simpletest_no_sudo.sh +++ b/tests/pytest/simpletest_no_sudo.sh @@ -10,4 +10,5 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib # Now we are all let, and let's run our cases! -python3 ./test.py -m 127.0.0.1 -f insert/basic.py +# python3 ./test.py -m 127.0.0.1 -f insert/basic.py +./random_walk.py From 741ba77cb2ed46fa9618328f307fa667aa6bcda0 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Thu, 23 Apr 2020 01:15:14 -0700 Subject: [PATCH 02/17] Now using deterministic dice --- .gitignore | 2 + tests/pytest/random_walk.py | 344 ++++++++++++++++++++++++++++++++++++ 2 files changed, 346 insertions(+) create mode 100755 tests/pytest/random_walk.py diff --git a/.gitignore b/.gitignore index e91d6739a5..8520c50b98 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,5 @@ Target/ *.sql sim/ +# Doxygen Generated files +html/ diff --git a/tests/pytest/random_walk.py b/tests/pytest/random_walk.py new file mode 100755 index 0000000000..5866351a99 --- /dev/null +++ b/tests/pytest/random_walk.py @@ -0,0 +1,344 @@ +#!/usr/bin/python3 +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- +import sys +import getopt + +import threading +import random +import logging + +from util.log import * +from util.dnodes import * +from util.cases import * +from util.sql import * + +import taos + +# Constants +LOGGING_LEVEL = logging.DEBUG + +def runThread(workerThread): + logger.info("Running Thread: {}".format(workerThread.tid)) + workerThread.run() + +class WorkerThread: + def __init__(self, pool, tid): # note: main thread context! + self.curStep = -1 + self.pool = pool + self.tid = tid + # self.threadIdent = threading.get_ident() + self.thread = threading.Thread(target=runThread, args=(self,)) + self.stepGate = threading.Condition() + + def start(self): + self.thread.start() # AFTER the thread is recorded + + def run(self): + # initialization after thread starts, in the thread context + self.isSleeping = False + + while self.curStep < self.pool.maxSteps: + # stepNo = self.pool.waitForStep() # Step to run + self.crossStepGate() # self.curStep will get incremented + self.doWork() + + def verifyThreadSelf(self): # ensure we are called by this own thread + if ( threading.get_ident() != self.thread.ident ): + raise RuntimeError("Unexpectly called from other threads") + + def verifyThreadMain(self): # ensure we are called by the main thread + if ( threading.get_ident() != threading.main_thread().ident ): + raise RuntimeError("Unexpectly called from other threads") + + def verifyThreadAlive(self): + if ( not self.thread.is_alive() ): + raise RuntimeError("Unexpected dead thread") + + def verifyIsSleeping(self, isSleeping): + if ( isSleeping != self.isSleeping ): + raise RuntimeError("Unexpected thread sleep status") + + def crossStepGate(self): + self.verifyThreadAlive() + self.verifyThreadSelf() # only allowed by ourselves + self.verifyIsSleeping(False) # has to be awake + + logger.debug("Worker thread {} going to sleep".format(self.tid)) + self.isSleeping = True # TODO: maybe too early? + self.pool.reportThreadWaiting() # TODO: this triggers the main thread, TOO early + + # Actually going to sleep + self.stepGate.acquire() # acquire lock immediately + self.stepGate.wait() # release and then acquire + self.stepGate.release() # release + + logger.debug("Worker thread {} woke up".format(self.tid)) + # Someone will wake us up here + self.curStep += 1 # off to a new step... + + def tapStepGate(self): # give it a tap, release the thread waiting there + self.verifyThreadAlive() + self.verifyThreadMain() # only allowed for main thread + self.verifyIsSleeping(True) # has to be sleeping + + logger.debug("Tapping worker thread {}".format(self.tid)) + self.stepGate.acquire() + # logger.debug("Tapping worker thread {}, lock acquired".format(self.tid)) + self.stepGate.notify() # wake up! + # logger.debug("Tapping worker thread {}, notified!".format(self.tid)) + self.isSleeping = False # No race condition for sure + self.stepGate.release() # this finishes before .wait() can return + # logger.debug("Tapping worker thread {}, lock released".format(self.tid)) + time.sleep(0) # let the released thread run a bit, IMPORTANT, do it after release + + def doWork(self): + logger.info(" Step {}, thread {}: ".format(self.curStep, self.tid)) + self.pool.dispatcher.doWork() + + +# We define a class to run a number of threads in locking steps. +class SteppingThreadPool: + def __init__(self, numThreads, maxSteps, funcSequencer): + self.numThreads = numThreads + self.maxSteps = maxSteps + self.funcSequencer = funcSequencer + # Internal class variables + self.dispatcher = WorkDispatcher(self) + self.curStep = 0 + self.threadList = [] + # self.stepGate = threading.Condition() # Gate to hold/sync all threads + self.numWaitingThreads = 0 + + # Thread coordination + self.lock = threading.Lock() # for critical section execution + self.mainGate = threading.Condition() + + # starting to run all the threads, in locking steps + def run(self): + # Create the threads + for tid in range(0, self.numThreads): + workerThread = WorkerThread(self, tid) + self.threadList.append(workerThread) + workerThread.start() # start, but should block immediately before step 0 + + # Coordinate all threads step by step + self.curStep = -1 # not started yet + while(self.curStep < self.maxSteps): + logger.debug("Main thread going to sleep") + self.mainGate.acquire() + self.mainGate.wait() # start snoozing + self.mainGate.release + logger.debug("Main thread woke up") # Now not all threads had time to go to sleep + time.sleep(0.01) # This is like forever + + self.curStep += 1 # starts with 0 + self.tapAllThreads() + + # The threads will run through many steps + for workerThread in self.threadList: + workerThread.thread.join() # slight hack, accessing members + + logger.info("All threads finished") + + def reportThreadWaiting(self): + allThreadWaiting = False + with self.lock: + self.numWaitingThreads += 1 + if ( self.numWaitingThreads == self.numThreads ): + allThreadWaiting = True + + if (allThreadWaiting): # aha, pass the baton to the main thread + logger.debug("All threads are now waiting") + self.numWaitingThreads = 0 # do this 1st to avoid race condition + # time.sleep(0.001) # thread yield, so main thread can be ready + self.mainGate.acquire() + self.mainGate.notify() # main thread would now start to run + self.mainGate.release() + time.sleep(0) # yield, maybe main thread can run for just a bit + + # def waitForStep(self): + # shouldWait = True; + # with self.lock: + # # if ( self.numWaitingThreads == 0 ): # first one here + # # self.stepGate.acquire() # acquire the underlying lock + + # self.numWaitingThreads += 1 + # # if ( self.numWaitingThreads < self.numThreads ): + # # do nothing, we should wait + # if ( self.numWaitingThreads == self.numThreads ): + # shouldWait = False # we should now wake up + # elif ( self.numWaitingThreads > self.numThreads ): + # raise RuntimeError("Corrupt state") + + # self.stepGate.acquire() + # if (shouldWait): + # self.stepGate.wait() + # else: + # self.numWaitingThreads = 0 # fresh start + # self.curStep += 1 # do this before letting all threads loose + # print("--> Starting step {}".format(self.curStep), end="\r\n") # before notify_all + # # self.stepGate.notify_all() + # self.wakeUpAll() + # self.stepGate.release() + # return self.curStep + + def tapAllThreads(self): # in a deterministic manner + wakeSeq = [] + for i in range(self.numThreads): # generate a random sequence + if Dice.throw(2) == 1 : + wakeSeq.append(i) + else: + wakeSeq.insert(0, i) + logger.info("Waking up threads: {}".format(str(wakeSeq))) + for i in wakeSeq: + self.threadList[i].tapStepGate() + time.sleep(0) # yield + +# A queue of continguous POSITIVE integers +class LinearQueue(): + def __init__(self): + self.firstIndex = 1 + self.lastIndex = 0 + + def push(self): # Push to the tail (largest) + if ( self.firstIndex > self.lastIndex ): # impossible, meaning it's empty + self.lastIndex = self.firstIndex + return self.firstIndex + # Otherwise we have something + self.lastIndex += 1 + return self.lastIndex + + def pop(self): + if ( self.firstIndex > self.lastIndex ): # empty + return 0 + index = self.firstIndex + self.firstIndex += 1 + return index + + +# State of the database as we believe it to be +class DbState(): + def __init__(self): + self.tableNumQueue = LinearQueue() + self.openDbServerConnection() + + def openDbServerConnection(self): + cfgPath = "../../build/test/cfg" # was: tdDnodes.getSimCfgPath() + conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable + + tdSql.init(conn.cursor()) + tdSql.prepare() # Recreate database, etc. + # tdSql.execute('show databases') + + def closeDbServerConnection(self): + tdSql.close() + tdLog.info("Disconnecting from database server") + + def getTableNameToCreate(self): + tblNum = self.tableNumQueue.push() + return "table_{}".format(tblNum) + + def getTableNameToDelete(self): + tblNum = self.tableNumQueue.pop() + if( tblNum==0 ) : + return False + return "table_{}".format(tblNum) + +class Task(): + def execute(self): + raise RuntimeError("Must be overriden by child class") + +class CreateTableTask(Task): + def execute(self): + tableName = dbState.getTableNameToCreate() + logger.info(" Creating a table {} ...".format(tableName)) + tdSql.execute("create table {} (ts timestamp, speed int)".format(tableName)) + +class DropTableTask(Task): + def execute(self): + tableName = dbState.getTableNameToDelete() + if ( not tableName ): # May be "False" + logger.info("Cannot generate a table to delete, skipping...") + return + logger.info(" Dropping a table {} ...".format(tableName)) + tdSql.execute("drop table {}".format(tableName)) + +class AddDataTask(Task): + def execute(self): + logger.info(" Adding some data...") + +# Deterministic random number generator +class Dice(): + seeded = False # static, uninitialized + + @classmethod + def seed(cls, s): # static + if (cls.seeded): + raise RuntimeError("Cannot seed the random generator more than once") + cls.verifyRNG() + random.seed(s) + cls.seeded = True # TODO: protect against multi-threading + + @classmethod + def verifyRNG(cls): # Verify that the RNG is determinstic + random.seed(0) + x1 = random.randrange(0, 1000) + x2 = random.randrange(0, 1000) + x3 = random.randrange(0, 1000) + if ( x1 != 864 or x2!=394 or x3!=776 ): + raise RuntimeError("System RNG is not deterministic") + + @classmethod + def throw(cls, max): # get 0 to max-1 + return cls.throwRange(0, max) + + @classmethod + def throwRange(cls, min, max): # up to max-1 + if ( not cls.seeded ): + raise RuntimeError("Cannot throw dice before seeding it") + return random.randrange(min, max) + + +# Anyone needing to carry out work should simply come here +class WorkDispatcher(): + def __init__(self, pool): + self.pool = pool + self.totalNumMethods = 2 + self.tasks = [ + CreateTableTask(), + DropTableTask(), + AddDataTask(), + ] + + def throwDice(self): + return random.randint(0, len(self.tasks) - 1) + + def doWork(self): + dice = self.throwDice() + task = self.tasks[dice] + task.execute() + +if __name__ == "__main__": + logger = logging.getLogger('myApp') + logger.setLevel(LOGGING_LEVEL) + ch = logging.StreamHandler() + logger.addHandler(ch) + + Dice.seed(0) # initial seeding of dice + dbState = DbState() + threadPool = SteppingThreadPool(3, 5, 0) + threadPool.run() + logger.info("Finished running thread pool") + dbState.closeDbServerConnection() + From 73bdaae15ca2459241dee16dc9ce31c84ad437d7 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 27 Apr 2020 01:52:23 -0700 Subject: [PATCH 03/17] Now using Python barriers and events to sync threads --- tests/pytest/random_walk.py | 234 ++++++++++++++++++++++++++---------- 1 file changed, 170 insertions(+), 64 deletions(-) diff --git a/tests/pytest/random_walk.py b/tests/pytest/random_walk.py index 5866351a99..146066431d 100755 --- a/tests/pytest/random_walk.py +++ b/tests/pytest/random_walk.py @@ -17,6 +17,7 @@ import getopt import threading import random import logging +import datetime from util.log import * from util.dnodes import * @@ -32,6 +33,34 @@ def runThread(workerThread): logger.info("Running Thread: {}".format(workerThread.tid)) workerThread.run() +# Used by one process to block till another is ready +# class Baton: +# def __init__(self): +# self._lock = threading.Lock() # control access to object +# self._baton = threading.Condition() # let thread block +# self._hasGiver = False +# self._hasTaker = False + +# def give(self): +# with self._lock: +# if ( self._hasGiver ): # already? +# raise RuntimeError("Cannot double-give a baton") +# self._hasGiver = True + +# self._settle() # may block, OUTSIDE self lock + +# def take(self): +# with self._lock: +# if ( self._hasTaker): +# raise RuntimeError("Cannot double-take a baton") +# self._hasTaker = True + +# self._settle() + +# def _settle(self): + + + class WorkerThread: def __init__(self, pool, tid): # note: main thread context! self.curStep = -1 @@ -39,14 +68,14 @@ class WorkerThread: self.tid = tid # self.threadIdent = threading.get_ident() self.thread = threading.Thread(target=runThread, args=(self,)) - self.stepGate = threading.Condition() + self.stepGate = threading.Event() def start(self): self.thread.start() # AFTER the thread is recorded def run(self): # initialization after thread starts, in the thread context - self.isSleeping = False + # self.isSleeping = False while self.curStep < self.pool.maxSteps: # stepNo = self.pool.waitForStep() # Step to run @@ -65,23 +94,26 @@ class WorkerThread: if ( not self.thread.is_alive() ): raise RuntimeError("Unexpected dead thread") - def verifyIsSleeping(self, isSleeping): - if ( isSleeping != self.isSleeping ): - raise RuntimeError("Unexpected thread sleep status") + # def verifyIsSleeping(self, isSleeping): + # if ( isSleeping != self.isSleeping ): + # raise RuntimeError("Unexpected thread sleep status") + # A gate is different from a barrier in that a thread needs to be "tapped" def crossStepGate(self): self.verifyThreadAlive() self.verifyThreadSelf() # only allowed by ourselves - self.verifyIsSleeping(False) # has to be awake + # self.verifyIsSleeping(False) # has to be awake - logger.debug("Worker thread {} going to sleep".format(self.tid)) - self.isSleeping = True # TODO: maybe too early? - self.pool.reportThreadWaiting() # TODO: this triggers the main thread, TOO early + logger.debug("Worker thread {} about to cross pool barrier".format(self.tid)) + # self.isSleeping = True # TODO: maybe too early? + self.pool.crossPoolBarrier() # wait for all other threads - # Actually going to sleep - self.stepGate.acquire() # acquire lock immediately - self.stepGate.wait() # release and then acquire - self.stepGate.release() # release + # Wait again at the "gate", waiting to be "tapped" + logger.debug("Worker thread {} about to cross the step gate".format(self.tid)) + # self.stepGate.acquire() # acquire lock immediately + self.stepGate.wait() + self.stepGate.clear() + # self.stepGate.release() # release logger.debug("Worker thread {} woke up".format(self.tid)) # Someone will wake us up here @@ -90,15 +122,15 @@ class WorkerThread: def tapStepGate(self): # give it a tap, release the thread waiting there self.verifyThreadAlive() self.verifyThreadMain() # only allowed for main thread - self.verifyIsSleeping(True) # has to be sleeping + # self.verifyIsSleeping(True) # has to be sleeping logger.debug("Tapping worker thread {}".format(self.tid)) - self.stepGate.acquire() + # self.stepGate.acquire() # logger.debug("Tapping worker thread {}, lock acquired".format(self.tid)) - self.stepGate.notify() # wake up! + self.stepGate.set() # wake up! # logger.debug("Tapping worker thread {}, notified!".format(self.tid)) - self.isSleeping = False # No race condition for sure - self.stepGate.release() # this finishes before .wait() can return + # self.isSleeping = False # No race condition for sure + # self.stepGate.release() # this finishes before .wait() can return # logger.debug("Tapping worker thread {}, lock released".format(self.tid)) time.sleep(0) # let the released thread run a bit, IMPORTANT, do it after release @@ -109,20 +141,21 @@ class WorkerThread: # We define a class to run a number of threads in locking steps. class SteppingThreadPool: - def __init__(self, numThreads, maxSteps, funcSequencer): + def __init__(self, dbState, numThreads, maxSteps, funcSequencer): self.numThreads = numThreads self.maxSteps = maxSteps self.funcSequencer = funcSequencer # Internal class variables - self.dispatcher = WorkDispatcher(self) + self.dispatcher = WorkDispatcher(self, dbState) self.curStep = 0 self.threadList = [] # self.stepGate = threading.Condition() # Gate to hold/sync all threads - self.numWaitingThreads = 0 - + # self.numWaitingThreads = 0 + # Thread coordination - self.lock = threading.Lock() # for critical section execution - self.mainGate = threading.Condition() + self.barrier = threading.Barrier(numThreads + 1) # plus main thread + # self.lock = threading.Lock() # for critical section execution + # self.mainGate = threading.Condition() # starting to run all the threads, in locking steps def run(self): @@ -136,13 +169,15 @@ class SteppingThreadPool: self.curStep = -1 # not started yet while(self.curStep < self.maxSteps): logger.debug("Main thread going to sleep") - self.mainGate.acquire() - self.mainGate.wait() # start snoozing - self.mainGate.release - logger.debug("Main thread woke up") # Now not all threads had time to go to sleep - time.sleep(0.01) # This is like forever + # self.mainGate.acquire() + # self.mainGate.wait() # start snoozing + # self.mainGate.release + self.crossPoolBarrier() + self.barrier.reset() # Other worker threads should now be at the "gate" + + logger.debug("Main thread waking up, tapping worker threads".format(self.curStep)) # Now not all threads had time to go to sleep + # time.sleep(0.01) # This is like forever - self.curStep += 1 # starts with 0 self.tapAllThreads() # The threads will run through many steps @@ -151,21 +186,29 @@ class SteppingThreadPool: logger.info("All threads finished") - def reportThreadWaiting(self): - allThreadWaiting = False - with self.lock: - self.numWaitingThreads += 1 - if ( self.numWaitingThreads == self.numThreads ): - allThreadWaiting = True + def crossPoolBarrier(self): + if ( self.barrier.n_waiting == self.numThreads ): # everyone else is waiting, inc main thread + logger.info("<-- Step {} finished".format(self.curStep)) + self.curStep += 1 # we are about to get into next step. TODO: race condition here! + logger.debug(" ") # line break + logger.debug("--> Step {} starts with main thread waking up".format(self.curStep)) # Now not all threads had time to go to sleep + - if (allThreadWaiting): # aha, pass the baton to the main thread - logger.debug("All threads are now waiting") - self.numWaitingThreads = 0 # do this 1st to avoid race condition - # time.sleep(0.001) # thread yield, so main thread can be ready - self.mainGate.acquire() - self.mainGate.notify() # main thread would now start to run - self.mainGate.release() - time.sleep(0) # yield, maybe main thread can run for just a bit + self.barrier.wait() + # allThreadWaiting = False + # with self.lock: + # self.numWaitingThreads += 1 + # if ( self.numWaitingThreads == self.numThreads ): + # allThreadWaiting = True + + # if (allThreadWaiting): # aha, pass the baton to the main thread + # logger.debug("All threads are now waiting") + # self.numWaitingThreads = 0 # do this 1st to avoid race condition + # # time.sleep(0.001) # thread yield, so main thread can be ready + # self.mainGate.acquire() + # self.mainGate.notify() # main thread would now start to run + # self.mainGate.release() + # time.sleep(0) # yield, maybe main thread can run for just a bit # def waitForStep(self): # shouldWait = True; @@ -201,6 +244,7 @@ class SteppingThreadPool: else: wakeSeq.insert(0, i) logger.info("Waking up threads: {}".format(str(wakeSeq))) + # TODO: set dice seed to a deterministic value for i in wakeSeq: self.threadList[i].tapStepGate() time.sleep(0) # yield @@ -208,30 +252,86 @@ class SteppingThreadPool: # A queue of continguous POSITIVE integers class LinearQueue(): def __init__(self): - self.firstIndex = 1 + self.firstIndex = 1 # 1st ever element self.lastIndex = 0 + self.lock = threading.RLock() # our functions may call each other + self.inUse = set() # the indexes that are in use right now def push(self): # Push to the tail (largest) - if ( self.firstIndex > self.lastIndex ): # impossible, meaning it's empty - self.lastIndex = self.firstIndex - return self.firstIndex - # Otherwise we have something - self.lastIndex += 1 - return self.lastIndex + with self.lock: + if ( self.firstIndex > self.lastIndex ): # impossible, meaning it's empty + self.lastIndex = self.firstIndex + return self.firstIndex + # Otherwise we have something + self.lastIndex += 1 + return self.lastIndex def pop(self): - if ( self.firstIndex > self.lastIndex ): # empty - return 0 - index = self.firstIndex - self.firstIndex += 1 - return index + with self.lock: + if ( self.isEmpty() ): + raise RuntimeError("Cannot pop an empty queue") + index = self.firstIndex + self.firstIndex += 1 + return index + + def isEmpty(self): + return self.firstIndex > self.lastIndex + + def popIfNotEmpty(self): + with self.lock: + if (self.isEmpty()): + return 0 + return self.pop() + + def use(self, i): + with self.lock: + if ( i in self.inUse ): + raise RuntimeError("Cannot re-use same index in queue: {}".format(i)) + self.inUse.add(i) + + def unUse(self, i): + with self.lock: + self.inUse.remove(i) # KeyError possible + + def size(self): + return self.lastIndex + 1 - self.firstIndex + + def allocate(self): + with self.lock: + cnt = 0 # counting the interations + while True: + cnt += 1 + if ( cnt > self.size()*10 ): # 10x iteration already + raise RuntimeError("Failed to allocate LinearQueue element") + ret = Dice.throwRange(self.firstIndex, self.lastIndex+1) + if ( not ret in self.inUse ): + return self.use(ret) # State of the database as we believe it to be class DbState(): def __init__(self): self.tableNumQueue = LinearQueue() + self.tick = datetime.datetime(2019, 1, 1) # initial date time tick + self.int = 0 # initial integer self.openDbServerConnection() + self.lock = threading.RLock() + + def pickTable(self): # pick any table, and "use" it + return self.tableNumQueue.allocate() + + def getNextTick(self): + with self.lock: # prevent duplicate tick + self.tick += datetime.timedelta(0, 1) # add one second to it + return self.tick + + def getNextInt(self): + with self.lock: + self.int += 1 + return self.int + + def unuseTable(self, i): # return the table back, so others can use it + self.tableNumQueue.unUse(i) def openDbServerConnection(self): cfgPath = "../../build/test/cfg" # was: tdDnodes.getSimCfgPath() @@ -250,12 +350,15 @@ class DbState(): return "table_{}".format(tblNum) def getTableNameToDelete(self): - tblNum = self.tableNumQueue.pop() - if( tblNum==0 ) : + if self.tableNumQueue.isEmpty: return False + tblNum = self.tableNumQueue.pop() # TODO: race condition! return "table_{}".format(tblNum) class Task(): + def __init__(self, dbState): + self.dbState = dbState + def execute(self): raise RuntimeError("Must be overriden by child class") @@ -277,6 +380,9 @@ class DropTableTask(Task): class AddDataTask(Task): def execute(self): logger.info(" Adding some data...") + # ds = self.dbState + # tIndex = self.dbState.pickTable() + # tdSql.execute("insert into table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())) # Deterministic random number generator class Dice(): @@ -312,13 +418,13 @@ class Dice(): # Anyone needing to carry out work should simply come here class WorkDispatcher(): - def __init__(self, pool): + def __init__(self, pool, dbState): self.pool = pool - self.totalNumMethods = 2 + # self.totalNumMethods = 2 self.tasks = [ - CreateTableTask(), - DropTableTask(), - AddDataTask(), + CreateTableTask(dbState), + DropTableTask(dbState), + # AddDataTask(dbState), ] def throwDice(self): @@ -337,7 +443,7 @@ if __name__ == "__main__": Dice.seed(0) # initial seeding of dice dbState = DbState() - threadPool = SteppingThreadPool(3, 5, 0) + threadPool = SteppingThreadPool(dbState, 3, 5, 0) threadPool.run() logger.info("Finished running thread pool") dbState.closeDbServerConnection() From dc2b3437bdf7c2d78cfec2e03bc0770bc015d866 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 27 Apr 2020 01:58:05 -0700 Subject: [PATCH 04/17] Recreated the tscServer.c:312 crash --- tests/pytest/random_walk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pytest/random_walk.py b/tests/pytest/random_walk.py index 146066431d..bf714ba694 100755 --- a/tests/pytest/random_walk.py +++ b/tests/pytest/random_walk.py @@ -350,7 +350,7 @@ class DbState(): return "table_{}".format(tblNum) def getTableNameToDelete(self): - if self.tableNumQueue.isEmpty: + if self.tableNumQueue.isEmpty(): return False tblNum = self.tableNumQueue.pop() # TODO: race condition! return "table_{}".format(tblNum) From 884e7fe6352405dd7e36c1c850d6e8b517f0628e Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 27 Apr 2020 02:03:11 -0700 Subject: [PATCH 05/17] Recreated the tscServer.c:312 crash --- tests/pytest/random_walk.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/tests/pytest/random_walk.py b/tests/pytest/random_walk.py index bf714ba694..6c4b5c8eec 100755 --- a/tests/pytest/random_walk.py +++ b/tests/pytest/random_walk.py @@ -283,20 +283,20 @@ class LinearQueue(): return 0 return self.pop() - def use(self, i): + def allocate(self, i): with self.lock: if ( i in self.inUse ): raise RuntimeError("Cannot re-use same index in queue: {}".format(i)) self.inUse.add(i) - def unUse(self, i): + def release(self, i): with self.lock: self.inUse.remove(i) # KeyError possible def size(self): return self.lastIndex + 1 - self.firstIndex - def allocate(self): + def pickAndAllocate(self): with self.lock: cnt = 0 # counting the interations while True: @@ -305,7 +305,7 @@ class LinearQueue(): raise RuntimeError("Failed to allocate LinearQueue element") ret = Dice.throwRange(self.firstIndex, self.lastIndex+1) if ( not ret in self.inUse ): - return self.use(ret) + return self.allocate(ret) # State of the database as we believe it to be @@ -317,8 +317,12 @@ class DbState(): self.openDbServerConnection() self.lock = threading.RLock() - def pickTable(self): # pick any table, and "use" it - return self.tableNumQueue.allocate() + def pickAndAllocateTable(self): # pick any table, and "use" it + return self.tableNumQueue.pickAndAllocate() + + def releaseTable(self, i): # return the table back, so others can use it + self.tableNumQueue.release(i) + def getNextTick(self): with self.lock: # prevent duplicate tick @@ -330,9 +334,6 @@ class DbState(): self.int += 1 return self.int - def unuseTable(self, i): # return the table back, so others can use it - self.tableNumQueue.unUse(i) - def openDbServerConnection(self): cfgPath = "../../build/test/cfg" # was: tdDnodes.getSimCfgPath() conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable @@ -380,9 +381,10 @@ class DropTableTask(Task): class AddDataTask(Task): def execute(self): logger.info(" Adding some data...") - # ds = self.dbState - # tIndex = self.dbState.pickTable() - # tdSql.execute("insert into table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())) + ds = self.dbState + tIndex = ds.pickTable() + tdSql.execute("insert into table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())) + ds.r # Deterministic random number generator class Dice(): @@ -423,8 +425,8 @@ class WorkDispatcher(): # self.totalNumMethods = 2 self.tasks = [ CreateTableTask(dbState), - DropTableTask(dbState), - # AddDataTask(dbState), + # DropTableTask(dbState), + AddDataTask(dbState), ] def throwDice(self): @@ -443,7 +445,7 @@ if __name__ == "__main__": Dice.seed(0) # initial seeding of dice dbState = DbState() - threadPool = SteppingThreadPool(dbState, 3, 5, 0) + threadPool = SteppingThreadPool(dbState, 1, 5, 0) threadPool.run() logger.info("Finished running thread pool") dbState.closeDbServerConnection() From 6768ae81a68b0bc4b1c5829f557efc29fc9d93dc Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 28 Apr 2020 00:12:53 -0700 Subject: [PATCH 06/17] Now each WorkerThread provides its own SQL execution, but still using a shared db connection --- .../python/linux/python3/taos/cursor.py | 9 + tests/pytest/random_walk.py | 245 ++++++++++-------- 2 files changed, 141 insertions(+), 113 deletions(-) diff --git a/src/connector/python/linux/python3/taos/cursor.py b/src/connector/python/linux/python3/taos/cursor.py index 7a0ae767bc..eee03770ca 100644 --- a/src/connector/python/linux/python3/taos/cursor.py +++ b/src/connector/python/linux/python3/taos/cursor.py @@ -1,6 +1,8 @@ from .cinterface import CTaosInterface from .error import * +querySeqNum = 0 + class TDengineCursor(object): """Database cursor which is used to manage the context of a fetch operation. @@ -109,7 +111,14 @@ class TDengineCursor(object): if params is not None: pass + + # global querySeqNum + # querySeqNum += 1 + # localSeqNum = querySeqNum # avoid raice condition + # print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt))) res = CTaosInterface.query(self._connection._conn, stmt) + # print(" << Query ({}) Exec Done".format(localSeqNum)) + if res == 0: if CTaosInterface.fieldsCount(self._connection._conn) == 0: self._affected_rows += CTaosInterface.affectedRows(self._connection._conn) diff --git a/tests/pytest/random_walk.py b/tests/pytest/random_walk.py index 6c4b5c8eec..5125d28ec0 100755 --- a/tests/pytest/random_walk.py +++ b/tests/pytest/random_walk.py @@ -62,26 +62,35 @@ def runThread(workerThread): class WorkerThread: - def __init__(self, pool, tid): # note: main thread context! + def __init__(self, pool, tid, dbState): # note: main thread context! self.curStep = -1 self.pool = pool self.tid = tid + self.dbState = dbState # self.threadIdent = threading.get_ident() self.thread = threading.Thread(target=runThread, args=(self,)) self.stepGate = threading.Event() + # Let us have a DB connection of our own + self._dbConn = DbConn() + + def start(self): self.thread.start() # AFTER the thread is recorded def run(self): # initialization after thread starts, in the thread context # self.isSleeping = False + self._dbConn.open() while self.curStep < self.pool.maxSteps: # stepNo = self.pool.waitForStep() # Step to run self.crossStepGate() # self.curStep will get incremented self.doWork() + # clean up + self._dbConn.close() + def verifyThreadSelf(self): # ensure we are called by this own thread if ( threading.get_ident() != self.thread.ident ): raise RuntimeError("Unexpectly called from other threads") @@ -136,7 +145,10 @@ class WorkerThread: def doWork(self): logger.info(" Step {}, thread {}: ".format(self.curStep, self.tid)) - self.pool.dispatcher.doWork() + self.pool.dispatcher.doWork(self) + + def execSql(self, sql): + return self.dbState.execSql(sql) # We define a class to run a number of threads in locking steps. @@ -158,10 +170,9 @@ class SteppingThreadPool: # self.mainGate = threading.Condition() # starting to run all the threads, in locking steps - def run(self): - # Create the threads - for tid in range(0, self.numThreads): - workerThread = WorkerThread(self, tid) + def run(self): + for tid in range(0, self.numThreads): # Create the threads + workerThread = WorkerThread(self, tid, dbState) self.threadList.append(workerThread) workerThread.start() # start, but should block immediately before step 0 @@ -169,15 +180,10 @@ class SteppingThreadPool: self.curStep = -1 # not started yet while(self.curStep < self.maxSteps): logger.debug("Main thread going to sleep") - # self.mainGate.acquire() - # self.mainGate.wait() # start snoozing - # self.mainGate.release self.crossPoolBarrier() - self.barrier.reset() # Other worker threads should now be at the "gate" - + self.barrier.reset() # Other worker threads should now be at the "gate" + logger.debug("Main thread waking up, tapping worker threads".format(self.curStep)) # Now not all threads had time to go to sleep - # time.sleep(0.01) # This is like forever - self.tapAllThreads() # The threads will run through many steps @@ -192,49 +198,8 @@ class SteppingThreadPool: self.curStep += 1 # we are about to get into next step. TODO: race condition here! logger.debug(" ") # line break logger.debug("--> Step {} starts with main thread waking up".format(self.curStep)) # Now not all threads had time to go to sleep - self.barrier.wait() - # allThreadWaiting = False - # with self.lock: - # self.numWaitingThreads += 1 - # if ( self.numWaitingThreads == self.numThreads ): - # allThreadWaiting = True - - # if (allThreadWaiting): # aha, pass the baton to the main thread - # logger.debug("All threads are now waiting") - # self.numWaitingThreads = 0 # do this 1st to avoid race condition - # # time.sleep(0.001) # thread yield, so main thread can be ready - # self.mainGate.acquire() - # self.mainGate.notify() # main thread would now start to run - # self.mainGate.release() - # time.sleep(0) # yield, maybe main thread can run for just a bit - - # def waitForStep(self): - # shouldWait = True; - # with self.lock: - # # if ( self.numWaitingThreads == 0 ): # first one here - # # self.stepGate.acquire() # acquire the underlying lock - - # self.numWaitingThreads += 1 - # # if ( self.numWaitingThreads < self.numThreads ): - # # do nothing, we should wait - # if ( self.numWaitingThreads == self.numThreads ): - # shouldWait = False # we should now wake up - # elif ( self.numWaitingThreads > self.numThreads ): - # raise RuntimeError("Corrupt state") - - # self.stepGate.acquire() - # if (shouldWait): - # self.stepGate.wait() - # else: - # self.numWaitingThreads = 0 # fresh start - # self.curStep += 1 # do this before letting all threads loose - # print("--> Starting step {}".format(self.curStep), end="\r\n") # before notify_all - # # self.stepGate.notify_all() - # self.wakeUpAll() - # self.stepGate.release() - # return self.curStep def tapAllThreads(self): # in a deterministic manner wakeSeq = [] @@ -254,23 +219,32 @@ class LinearQueue(): def __init__(self): self.firstIndex = 1 # 1st ever element self.lastIndex = 0 - self.lock = threading.RLock() # our functions may call each other + self._lock = threading.RLock() # our functions may call each other self.inUse = set() # the indexes that are in use right now - def push(self): # Push to the tail (largest) - with self.lock: - if ( self.firstIndex > self.lastIndex ): # impossible, meaning it's empty - self.lastIndex = self.firstIndex - return self.firstIndex + def toText(self): + return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse) + + # Push (add new element, largest) to the tail, and mark it in use + def push(self): + with self._lock: + # if ( self.isEmpty() ): + # self.lastIndex = self.firstIndex + # return self.firstIndex # Otherwise we have something self.lastIndex += 1 + self.allocate(self.lastIndex) + # self.inUse.add(self.lastIndex) # mark it in use immediately return self.lastIndex def pop(self): - with self.lock: + with self._lock: if ( self.isEmpty() ): raise RuntimeError("Cannot pop an empty queue") index = self.firstIndex + if ( index in self.inUse ): + self.inUse.remove(index) # TODO: what about discard? + self.firstIndex += 1 return index @@ -278,113 +252,155 @@ class LinearQueue(): return self.firstIndex > self.lastIndex def popIfNotEmpty(self): - with self.lock: + with self._lock: if (self.isEmpty()): return 0 return self.pop() def allocate(self, i): - with self.lock: + with self._lock: if ( i in self.inUse ): raise RuntimeError("Cannot re-use same index in queue: {}".format(i)) self.inUse.add(i) def release(self, i): - with self.lock: + with self._lock: self.inUse.remove(i) # KeyError possible def size(self): return self.lastIndex + 1 - self.firstIndex def pickAndAllocate(self): - with self.lock: + if ( self.isEmpty() ): + return None + with self._lock: cnt = 0 # counting the interations while True: cnt += 1 if ( cnt > self.size()*10 ): # 10x iteration already - raise RuntimeError("Failed to allocate LinearQueue element") + # raise RuntimeError("Failed to allocate LinearQueue element") + return None ret = Dice.throwRange(self.firstIndex, self.lastIndex+1) if ( not ret in self.inUse ): - return self.allocate(ret) + self.allocate(ret) + return ret +class DbConn: + def __init__(self): + self.isOpen = False + + def open(self): # Open connection + if ( self.isOpen ): + raise RuntimeError("Cannot re-open an existing DB connection") + + cfgPath = "../../build/test/cfg" + conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable + + self._tdSql = TDSql() + self._tdSql.init(conn.cursor()) + self.isOpen = True + + def resetDb(self): # reset the whole database, etc. + if ( not self.isOpen ): + raise RuntimeError("Cannot reset database until connection is open") + self._tdSql.prepare() # Recreate database, etc. + # tdSql.execute('show databases') + + def close(self): + if ( not self.isOpen ): + raise RuntimeError("Cannot clean up database until connection is open") + self._tdSql.close() + self.isOpen = False + + def execSql(self, sql): + return self._tdSql.execute(sql) # State of the database as we believe it to be class DbState(): def __init__(self): self.tableNumQueue = LinearQueue() - self.tick = datetime.datetime(2019, 1, 1) # initial date time tick - self.int = 0 # initial integer - self.openDbServerConnection() - self.lock = threading.RLock() + self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick + self._lastInt = 0 # next one is initial integer + self._lock = threading.RLock() + + # self.openDbServerConnection() + self._dbConn = DbConn() + self._dbConn.open() + self._dbConn.resetDb() # drop and recreate DB def pickAndAllocateTable(self): # pick any table, and "use" it return self.tableNumQueue.pickAndAllocate() + def addTable(self): + with self._lock: + tIndex = self.tableNumQueue.push() + return tIndex + def releaseTable(self, i): # return the table back, so others can use it self.tableNumQueue.release(i) - def getNextTick(self): - with self.lock: # prevent duplicate tick - self.tick += datetime.timedelta(0, 1) # add one second to it - return self.tick + with self._lock: # prevent duplicate tick + self._lastTick += datetime.timedelta(0, 1) # add one second to it + return self._lastTick def getNextInt(self): - with self.lock: - self.int += 1 - return self.int - - def openDbServerConnection(self): - cfgPath = "../../build/test/cfg" # was: tdDnodes.getSimCfgPath() - conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable - - tdSql.init(conn.cursor()) - tdSql.prepare() # Recreate database, etc. - # tdSql.execute('show databases') - - def closeDbServerConnection(self): - tdSql.close() - tdLog.info("Disconnecting from database server") - - def getTableNameToCreate(self): - tblNum = self.tableNumQueue.push() - return "table_{}".format(tblNum) - + with self._lock: + self._lastInt += 1 + return self._lastInt + def getTableNameToDelete(self): if self.tableNumQueue.isEmpty(): return False tblNum = self.tableNumQueue.pop() # TODO: race condition! return "table_{}".format(tblNum) + def execSql(self, sql): # using the main DB connection + return self._dbConn.execSql(sql) + + def cleanUp(self): + self._dbConn.close() + class Task(): def __init__(self, dbState): self.dbState = dbState - def execute(self): + def execute(self, workerThread): raise RuntimeError("Must be overriden by child class") + def execSql(self, sql): + return self.dbState.execute(sql) + class CreateTableTask(Task): - def execute(self): - tableName = dbState.getTableNameToCreate() - logger.info(" Creating a table {} ...".format(tableName)) - tdSql.execute("create table {} (ts timestamp, speed int)".format(tableName)) + def execute(self, wt): + tIndex = dbState.addTable() + logger.debug(" Creating a table {} ...".format(tIndex)) + wt.execSql("create table table_{} (ts timestamp, speed int)".format(tIndex)) + logger.debug(" Table {} created.".format(tIndex)) + dbState.releaseTable(tIndex) class DropTableTask(Task): - def execute(self): + def execute(self, wt): tableName = dbState.getTableNameToDelete() if ( not tableName ): # May be "False" - logger.info("Cannot generate a table to delete, skipping...") + logger.info(" Cannot generate a table to delete, skipping...") return logger.info(" Dropping a table {} ...".format(tableName)) - tdSql.execute("drop table {}".format(tableName)) + wt.execSql("drop table {}".format(tableName)) class AddDataTask(Task): - def execute(self): - logger.info(" Adding some data...") + def execute(self, wt): ds = self.dbState - tIndex = ds.pickTable() - tdSql.execute("insert into table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())) - ds.r + logger.info(" Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) + tIndex = ds.pickAndAllocateTable() + if ( tIndex == None ): + logger.info(" No table found to add data, skipping...") + return + sql = "insert into table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) + logger.debug(" Executing SQL: {}".format(sql)) + wt.execSql(sql) + ds.releaseTable(tIndex) + logger.debug(" Finished adding data") # Deterministic random number generator class Dice(): @@ -430,12 +446,15 @@ class WorkDispatcher(): ] def throwDice(self): - return random.randint(0, len(self.tasks) - 1) + max = len(self.tasks) - 1 + dRes = random.randint(0, max) + # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) + return dRes - def doWork(self): + def doWork(self, workerThread): dice = self.throwDice() task = self.tasks[dice] - task.execute() + task.execute(workerThread) if __name__ == "__main__": logger = logging.getLogger('myApp') @@ -445,8 +464,8 @@ if __name__ == "__main__": Dice.seed(0) # initial seeding of dice dbState = DbState() - threadPool = SteppingThreadPool(dbState, 1, 5, 0) + threadPool = SteppingThreadPool(dbState, 3, 5, 0) threadPool.run() logger.info("Finished running thread pool") - dbState.closeDbServerConnection() + dbState.cleanUp() From 8c64960e4c4c0f6d605c390748c4a2ff5c5fe0e8 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Fri, 1 May 2020 00:01:34 -0700 Subject: [PATCH 07/17] Minor adjustments before sharing with Jeff --- src/connector/python/linux/python3/taos/cursor.py | 2 +- tests/pytest/{random_walk.py => crash_gen.py} | 0 tests/pytest/simpletest_no_sudo.sh | 3 +-- 3 files changed, 2 insertions(+), 3 deletions(-) rename tests/pytest/{random_walk.py => crash_gen.py} (100%) diff --git a/src/connector/python/linux/python3/taos/cursor.py b/src/connector/python/linux/python3/taos/cursor.py index eee03770ca..dfbb0f2064 100644 --- a/src/connector/python/linux/python3/taos/cursor.py +++ b/src/connector/python/linux/python3/taos/cursor.py @@ -1,7 +1,7 @@ from .cinterface import CTaosInterface from .error import * -querySeqNum = 0 +# querySeqNum = 0 class TDengineCursor(object): """Database cursor which is used to manage the context of a fetch operation. diff --git a/tests/pytest/random_walk.py b/tests/pytest/crash_gen.py similarity index 100% rename from tests/pytest/random_walk.py rename to tests/pytest/crash_gen.py diff --git a/tests/pytest/simpletest_no_sudo.sh b/tests/pytest/simpletest_no_sudo.sh index 135a7d8814..61faf3df52 100755 --- a/tests/pytest/simpletest_no_sudo.sh +++ b/tests/pytest/simpletest_no_sudo.sh @@ -10,5 +10,4 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib # Now we are all let, and let's run our cases! -# python3 ./test.py -m 127.0.0.1 -f insert/basic.py -./random_walk.py +python3 ./test.py -m 127.0.0.1 -f insert/basic.py From 897cfafcf4b5b4aada883b30ee0caab152045f20 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Fri, 1 May 2020 00:12:36 -0700 Subject: [PATCH 08/17] added a missing file --- tests/pytest/crash_gen.sh | 40 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100755 tests/pytest/crash_gen.sh diff --git a/tests/pytest/crash_gen.sh b/tests/pytest/crash_gen.sh new file mode 100755 index 0000000000..4db751f997 --- /dev/null +++ b/tests/pytest/crash_gen.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +# This is the script for us to try to cause the TDengine server or client to crash +# +# PREPARATION +# +# 1. Build an compile the TDengine source code that comes with this script, in the same directory tree +# 2. Please follow the direction in our README.md, and build TDengine in the build/ directory +# 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg +# 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg +# 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above +# +# RUNNING THIS SCRIPT +# +# This script assumes the source code directory is intact, and that the binaries has been built in the +# build/ directory, as such, will will load the Python libraries in the directory tree, and also load +# the TDengine client shared library (so) file, in the build/directory, as evidenced in the env +# variables below. +# +# Running the script is simple, no parameter is needed (for now, but will change in the future). +# +# Happy Crashing... + + +# Due to the heavy path name assumptions/usage, let us require that the user be in the current directory +EXEC_DIR=`dirname "$0"` +if [[ $EXEC_DIR != "." ]] +then + echo "ERROR: Please execute `basename "$0"` in its own directory (for now anyway, pardon the dust)" + exit -1 +fi + +# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work. +export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3 + +# Then let us set up the library path so that our compiled SO file can be loaded by Python +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib + +# Now we are all let, and let's see if we can find a crash. +./crash_gen.py From 2d76c46756140da83f4d42a9766d35e1c31742fd Mon Sep 17 00:00:00 2001 From: Steven Li Date: Sun, 3 May 2020 17:28:18 -0700 Subject: [PATCH 09/17] Added per-thread-db-connection option, after getting Python argparse --- tests/pytest/crash_gen.py | 140 +++++++++++++++++++++++++------------- tests/pytest/crash_gen.sh | 5 +- 2 files changed, 96 insertions(+), 49 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 5125d28ec0..c525a59d7e 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -13,6 +13,7 @@ # -*- coding: utf-8 -*- import sys import getopt +import argparse import threading import random @@ -26,11 +27,11 @@ from util.sql import * import taos -# Constants -LOGGING_LEVEL = logging.DEBUG -def runThread(workerThread): - logger.info("Running Thread: {}".format(workerThread.tid)) +# Command-line/Environment Configurations +gConfig = None # will set a bit later + +def runThread(workerThread): workerThread.run() # Used by one process to block till another is ready @@ -63,36 +64,42 @@ def runThread(workerThread): class WorkerThread: def __init__(self, pool, tid, dbState): # note: main thread context! - self.curStep = -1 - self.pool = pool - self.tid = tid - self.dbState = dbState + self._curStep = -1 + self._pool = pool + self._tid = tid + self._dbState = dbState # self.threadIdent = threading.get_ident() - self.thread = threading.Thread(target=runThread, args=(self,)) - self.stepGate = threading.Event() + self._thread = threading.Thread(target=runThread, args=(self,)) + self._stepGate = threading.Event() # Let us have a DB connection of our own - self._dbConn = DbConn() + if ( gConfig.per_thread_db_connection ): + self._dbConn = DbConn() def start(self): - self.thread.start() # AFTER the thread is recorded + self._thread.start() # AFTER the thread is recorded def run(self): # initialization after thread starts, in the thread context # self.isSleeping = False - self._dbConn.open() + logger.info("Starting to run thread: {}".format(self._tid)) - while self.curStep < self.pool.maxSteps: + if ( gConfig.per_thread_db_connection ): + self._dbConn.open() + # self._dbConn.resetDb() + + while self._curStep < self._pool.maxSteps: # stepNo = self.pool.waitForStep() # Step to run self.crossStepGate() # self.curStep will get incremented self.doWork() # clean up - self._dbConn.close() + if ( gConfig.per_thread_db_connection ): + self._dbConn.close() def verifyThreadSelf(self): # ensure we are called by this own thread - if ( threading.get_ident() != self.thread.ident ): + if ( threading.get_ident() != self._thread.ident ): raise RuntimeError("Unexpectly called from other threads") def verifyThreadMain(self): # ensure we are called by the main thread @@ -100,7 +107,7 @@ class WorkerThread: raise RuntimeError("Unexpectly called from other threads") def verifyThreadAlive(self): - if ( not self.thread.is_alive() ): + if ( not self._thread.is_alive() ): raise RuntimeError("Unexpected dead thread") # def verifyIsSleeping(self, isSleeping): @@ -113,30 +120,30 @@ class WorkerThread: self.verifyThreadSelf() # only allowed by ourselves # self.verifyIsSleeping(False) # has to be awake - logger.debug("Worker thread {} about to cross pool barrier".format(self.tid)) + logger.debug("Worker thread {} about to cross pool barrier".format(self._tid)) # self.isSleeping = True # TODO: maybe too early? - self.pool.crossPoolBarrier() # wait for all other threads + self._pool.crossPoolBarrier() # wait for all other threads # Wait again at the "gate", waiting to be "tapped" - logger.debug("Worker thread {} about to cross the step gate".format(self.tid)) + logger.debug("Worker thread {} about to cross the step gate".format(self._tid)) # self.stepGate.acquire() # acquire lock immediately - self.stepGate.wait() - self.stepGate.clear() + self._stepGate.wait() + self._stepGate.clear() # self.stepGate.release() # release - logger.debug("Worker thread {} woke up".format(self.tid)) + logger.debug("Worker thread {} woke up".format(self._tid)) # Someone will wake us up here - self.curStep += 1 # off to a new step... + self._curStep += 1 # off to a new step... def tapStepGate(self): # give it a tap, release the thread waiting there self.verifyThreadAlive() self.verifyThreadMain() # only allowed for main thread # self.verifyIsSleeping(True) # has to be sleeping - logger.debug("Tapping worker thread {}".format(self.tid)) + logger.debug("Tapping worker thread {}".format(self._tid)) # self.stepGate.acquire() # logger.debug("Tapping worker thread {}, lock acquired".format(self.tid)) - self.stepGate.set() # wake up! + self._stepGate.set() # wake up! # logger.debug("Tapping worker thread {}, notified!".format(self.tid)) # self.isSleeping = False # No race condition for sure # self.stepGate.release() # this finishes before .wait() can return @@ -144,12 +151,15 @@ class WorkerThread: time.sleep(0) # let the released thread run a bit, IMPORTANT, do it after release def doWork(self): - logger.info(" Step {}, thread {}: ".format(self.curStep, self.tid)) - self.pool.dispatcher.doWork(self) + logger.info(" Step {}, thread {}: ".format(self._curStep, self._tid)) + self._pool.dispatcher.doWork(self) def execSql(self, sql): - return self.dbState.execSql(sql) - + if ( gConfig.per_thread_db_connection ): + return self._dbConn.execSql(sql) + else: + return self._dbState.getDbConn().execSql(sql) + # We define a class to run a number of threads in locking steps. class SteppingThreadPool: @@ -188,7 +198,7 @@ class SteppingThreadPool: # The threads will run through many steps for workerThread in self.threadList: - workerThread.thread.join() # slight hack, accessing members + workerThread._thread.join() # slight hack, accessing members logger.info("All threads finished") @@ -240,10 +250,15 @@ class LinearQueue(): def pop(self): with self._lock: if ( self.isEmpty() ): - raise RuntimeError("Cannot pop an empty queue") + # raise RuntimeError("Cannot pop an empty queue") + return False # TODO: None? + index = self.firstIndex if ( index in self.inUse ): - self.inUse.remove(index) # TODO: what about discard? + return False + + # if ( index in self.inUse ): + # self.inUse.remove(index) # TODO: what about discard? self.firstIndex += 1 return index @@ -259,13 +274,15 @@ class LinearQueue(): def allocate(self, i): with self._lock: + # logger.debug("LQ allocating item {}".format(i)) if ( i in self.inUse ): raise RuntimeError("Cannot re-use same index in queue: {}".format(i)) self.inUse.add(i) def release(self, i): with self._lock: - self.inUse.remove(i) # KeyError possible + # logger.debug("LQ releasing item {}".format(i)) + self.inUse.remove(i) # KeyError possible, TODO: why? def size(self): return self.lastIndex + 1 - self.firstIndex @@ -287,6 +304,8 @@ class LinearQueue(): class DbConn: def __init__(self): + self._conn = None + self._cursor = None self.isOpen = False def open(self): # Open connection @@ -294,16 +313,27 @@ class DbConn: raise RuntimeError("Cannot re-open an existing DB connection") cfgPath = "../../build/test/cfg" - conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable + self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable + self._cursor = self._conn.cursor() + # Get the connection/cursor ready + self._cursor.execute('reset query cache') + # self._cursor.execute('use db') + + # Open connection self._tdSql = TDSql() - self._tdSql.init(conn.cursor()) + self._tdSql.init(self._cursor) self.isOpen = True def resetDb(self): # reset the whole database, etc. if ( not self.isOpen ): raise RuntimeError("Cannot reset database until connection is open") - self._tdSql.prepare() # Recreate database, etc. + # self._tdSql.prepare() # Recreate database, etc. + + self._cursor.execute('drop database if exists db') + self._cursor.execute('create database db') + # self._cursor.execute('use db') + # tdSql.execute('show databases') def close(self): @@ -312,7 +342,9 @@ class DbConn: self._tdSql.close() self.isOpen = False - def execSql(self, sql): + def execSql(self, sql): + if ( not self.isOpen ): + raise RuntimeError("Cannot query database until connection is open") return self._tdSql.execute(sql) # State of the database as we believe it to be @@ -328,6 +360,9 @@ class DbState(): self._dbConn.open() self._dbConn.resetDb() # drop and recreate DB + def getDbConn(self): + return self._dbConn + def pickAndAllocateTable(self): # pick any table, and "use" it return self.tableNumQueue.pickAndAllocate() @@ -350,9 +385,10 @@ class DbState(): return self._lastInt def getTableNameToDelete(self): - if self.tableNumQueue.isEmpty(): - return False tblNum = self.tableNumQueue.pop() # TODO: race condition! + if ( not tblNum ): # maybe false + return False + return "table_{}".format(tblNum) def execSql(self, sql): # using the main DB connection @@ -375,7 +411,7 @@ class CreateTableTask(Task): def execute(self, wt): tIndex = dbState.addTable() logger.debug(" Creating a table {} ...".format(tIndex)) - wt.execSql("create table table_{} (ts timestamp, speed int)".format(tIndex)) + wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) logger.debug(" Table {} created.".format(tIndex)) dbState.releaseTable(tIndex) @@ -385,8 +421,8 @@ class DropTableTask(Task): if ( not tableName ): # May be "False" logger.info(" Cannot generate a table to delete, skipping...") return - logger.info(" Dropping a table {} ...".format(tableName)) - wt.execSql("drop table {}".format(tableName)) + logger.info(" Dropping a table db.{} ...".format(tableName)) + wt.execSql("drop table db.{}".format(tableName)) class AddDataTask(Task): def execute(self, wt): @@ -396,7 +432,7 @@ class AddDataTask(Task): if ( tIndex == None ): logger.info(" No table found to add data, skipping...") return - sql = "insert into table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) + sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) logger.debug(" Executing SQL: {}".format(sql)) wt.execSql(sql) ds.releaseTable(tIndex) @@ -441,7 +477,7 @@ class WorkDispatcher(): # self.totalNumMethods = 2 self.tasks = [ CreateTableTask(dbState), - # DropTableTask(dbState), + DropTableTask(dbState), AddDataTask(dbState), ] @@ -457,14 +493,24 @@ class WorkDispatcher(): task.execute(workerThread) if __name__ == "__main__": + # Super cool Python argument library: https://docs.python.org/3/library/argparse.html + parser = argparse.ArgumentParser(description='TDengine Auto Crash Generator') + parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + help='Use a single shared db connection (default: false)') + parser.add_argument('-d', '--debug', action='store_true', + help='Turn on DEBUG mode for more logging (default: false)') + + gConfig = parser.parse_args() + logger = logging.getLogger('myApp') - logger.setLevel(LOGGING_LEVEL) + if ( gConfig.debug ): + logger.setLevel(logging.DEBUG) # default seems to be INFO ch = logging.StreamHandler() logger.addHandler(ch) Dice.seed(0) # initial seeding of dice dbState = DbState() - threadPool = SteppingThreadPool(dbState, 3, 5, 0) + threadPool = SteppingThreadPool(dbState, 5, 10, 0) threadPool.run() logger.info("Finished running thread pool") dbState.cleanUp() diff --git a/tests/pytest/crash_gen.sh b/tests/pytest/crash_gen.sh index 4db751f997..c845b39764 100755 --- a/tests/pytest/crash_gen.sh +++ b/tests/pytest/crash_gen.sh @@ -9,6 +9,7 @@ # 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg # 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg # 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above +# 6. Make sure you have the proper Python packages: # sudo apt install python3-setuptools python3-pip python3-distutils # # RUNNING THIS SCRIPT # @@ -36,5 +37,5 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3 # Then let us set up the library path so that our compiled SO file can be loaded by Python export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib -# Now we are all let, and let's see if we can find a crash. -./crash_gen.py +# Now we are all let, and let's see if we can find a crash. Note we pass all params +./crash_gen.py $@ From cc93971b67494938595f2dfa2f43882bbf397fac Mon Sep 17 00:00:00 2001 From: Steven Li Date: Sun, 3 May 2020 19:08:03 -0700 Subject: [PATCH 10/17] Fixed a barrier problem, now main/worker thread sync correctly --- tests/pytest/crash_gen.py | 80 ++++++++++++++++++++++----------------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index c525a59d7e..ee4a68eee5 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -74,8 +74,7 @@ class WorkerThread: # Let us have a DB connection of our own if ( gConfig.per_thread_db_connection ): - self._dbConn = DbConn() - + self._dbConn = DbConn() def start(self): self._thread.start() # AFTER the thread is recorded @@ -120,18 +119,16 @@ class WorkerThread: self.verifyThreadSelf() # only allowed by ourselves # self.verifyIsSleeping(False) # has to be awake - logger.debug("Worker thread {} about to cross pool barrier".format(self._tid)) + # logger.debug("Worker thread {} about to cross pool barrier".format(self._tid)) # self.isSleeping = True # TODO: maybe too early? self._pool.crossPoolBarrier() # wait for all other threads # Wait again at the "gate", waiting to be "tapped" - logger.debug("Worker thread {} about to cross the step gate".format(self._tid)) - # self.stepGate.acquire() # acquire lock immediately + # logger.debug("Worker thread {} about to cross the step gate".format(self._tid)) self._stepGate.wait() self._stepGate.clear() - # self.stepGate.release() # release - logger.debug("Worker thread {} woke up".format(self._tid)) + # logger.debug("Worker thread {} woke up".format(self._tid)) # Someone will wake us up here self._curStep += 1 # off to a new step... @@ -151,9 +148,15 @@ class WorkerThread: time.sleep(0) # let the released thread run a bit, IMPORTANT, do it after release def doWork(self): - logger.info(" Step {}, thread {}: ".format(self._curStep, self._tid)) + self.logInfo("Thread starting an execution") self._pool.dispatcher.doWork(self) + def logInfo(self, msg): + logger.info(" T[{}.{}]: ".format(self._curStep, self._tid) + msg) + + def logDebug(self, msg): + logger.debug(" T[{}.{}]: ".format(self._curStep, self._tid) + msg) + def execSql(self, sql): if ( gConfig.per_thread_db_connection ): return self._dbConn.execSql(sql) @@ -175,9 +178,8 @@ class SteppingThreadPool: # self.numWaitingThreads = 0 # Thread coordination - self.barrier = threading.Barrier(numThreads + 1) # plus main thread - # self.lock = threading.Lock() # for critical section execution - # self.mainGate = threading.Condition() + self._lock = threading.RLock() # lock to control access (e.g. even reading it is dangerous) + self._poolBarrier = threading.Barrier(numThreads + 1) # do nothing before crossing this, except main thread # starting to run all the threads, in locking steps def run(self): @@ -189,11 +191,20 @@ class SteppingThreadPool: # Coordinate all threads step by step self.curStep = -1 # not started yet while(self.curStep < self.maxSteps): + print(".", end="", flush=True) logger.debug("Main thread going to sleep") - self.crossPoolBarrier() - self.barrier.reset() # Other worker threads should now be at the "gate" - logger.debug("Main thread waking up, tapping worker threads".format(self.curStep)) # Now not all threads had time to go to sleep + # Now ready to enter a step + self.crossPoolBarrier() # let other threads go past the pool barrier, but wait at the thread gate + self._poolBarrier.reset() # Other worker threads should now be at the "gate" + + # Rare chance, when all threads should be blocked at the "step gate" for each thread + logger.info("<-- Step {} finished".format(self.curStep)) + self.curStep += 1 # we are about to get into next step. TODO: race condition here! + logger.debug(" ") # line break + logger.debug("--> Step {} starts with main thread waking up".format(self.curStep)) # Now not all threads had time to go to sleep + + logger.debug("Main thread waking up at step {}, tapping worker threads".format(self.curStep)) # Now not all threads had time to go to sleep self.tapAllThreads() # The threads will run through many steps @@ -201,15 +212,11 @@ class SteppingThreadPool: workerThread._thread.join() # slight hack, accessing members logger.info("All threads finished") + print("") + print("Finished") def crossPoolBarrier(self): - if ( self.barrier.n_waiting == self.numThreads ): # everyone else is waiting, inc main thread - logger.info("<-- Step {} finished".format(self.curStep)) - self.curStep += 1 # we are about to get into next step. TODO: race condition here! - logger.debug(" ") # line break - logger.debug("--> Step {} starts with main thread waking up".format(self.curStep)) # Now not all threads had time to go to sleep - - self.barrier.wait() + self._poolBarrier.wait() def tapAllThreads(self): # in a deterministic manner wakeSeq = [] @@ -397,46 +404,51 @@ class DbState(): def cleanUp(self): self._dbConn.close() +# A task is a long-living entity, carrying out short-lived "executions" for threads class Task(): def __init__(self, dbState): self.dbState = dbState + def _executeInternal(self, wt): + raise RuntimeError("To be implemeted by child classes") + def execute(self, workerThread): - raise RuntimeError("Must be overriden by child class") + self._executeInternal(workerThread) # TODO: no return value? + workerThread.logDebug("[X] task execution completed") def execSql(self, sql): return self.dbState.execute(sql) class CreateTableTask(Task): - def execute(self, wt): + def _executeInternal(self, wt): tIndex = dbState.addTable() - logger.debug(" Creating a table {} ...".format(tIndex)) + wt.logDebug("Creating a table {} ...".format(tIndex)) wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) - logger.debug(" Table {} created.".format(tIndex)) + wt.logDebug("Table {} created.".format(tIndex)) dbState.releaseTable(tIndex) class DropTableTask(Task): - def execute(self, wt): + def _executeInternal(self, wt): tableName = dbState.getTableNameToDelete() if ( not tableName ): # May be "False" - logger.info(" Cannot generate a table to delete, skipping...") + wt.logInfo("Cannot generate a table to delete, skipping...") return - logger.info(" Dropping a table db.{} ...".format(tableName)) + wt.logInfo("Dropping a table db.{} ...".format(tableName)) wt.execSql("drop table db.{}".format(tableName)) class AddDataTask(Task): - def execute(self, wt): + def _executeInternal(self, wt): ds = self.dbState - logger.info(" Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) + wt.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) tIndex = ds.pickAndAllocateTable() if ( tIndex == None ): - logger.info(" No table found to add data, skipping...") + wt.logInfo("No table found to add data, skipping...") return sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - logger.debug(" Executing SQL: {}".format(sql)) + wt.logDebug("Executing SQL: {}".format(sql)) wt.execSql(sql) ds.releaseTable(tIndex) - logger.debug(" Finished adding data") + wt.logDebug("Finished adding data") # Deterministic random number generator class Dice(): @@ -510,7 +522,7 @@ if __name__ == "__main__": Dice.seed(0) # initial seeding of dice dbState = DbState() - threadPool = SteppingThreadPool(dbState, 5, 10, 0) + threadPool = SteppingThreadPool(dbState, 5, 500, 0) threadPool.run() logger.info("Finished running thread pool") dbState.cleanUp() From 5d14a745283226b00da32558131349524fb652c4 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 4 May 2020 22:23:27 -0700 Subject: [PATCH 11/17] Refactored to use ThreadCoordinator and TaskExecutor, added command line parameters --- tests/pytest/crash_gen.py | 340 +++++++++++++++++++++----------------- 1 file changed, 184 insertions(+), 156 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index ee4a68eee5..a5baf8577b 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3 +#!/usr/bin/python3.7 ################################################################### # Copyright (c) 2016 by TAOS Technologies, Inc. # All rights reserved. @@ -11,7 +11,13 @@ ################################################################### # -*- coding: utf-8 -*- +from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel + import sys +# Require Python 3 +if sys.version_info[0] < 3: + raise Exception("Must be using Python 3") + import getopt import argparse @@ -25,78 +31,68 @@ from util.dnodes import * from util.cases import * from util.sql import * +import crash_gen import taos +# Global variables, tried to keep a small number. +gConfig = None # Command-line/Environment Configurations, will set a bit later +logger = None -# Command-line/Environment Configurations -gConfig = None # will set a bit later - -def runThread(workerThread): - workerThread.run() - -# Used by one process to block till another is ready -# class Baton: -# def __init__(self): -# self._lock = threading.Lock() # control access to object -# self._baton = threading.Condition() # let thread block -# self._hasGiver = False -# self._hasTaker = False - -# def give(self): -# with self._lock: -# if ( self._hasGiver ): # already? -# raise RuntimeError("Cannot double-give a baton") -# self._hasGiver = True - -# self._settle() # may block, OUTSIDE self lock - -# def take(self): -# with self._lock: -# if ( self._hasTaker): -# raise RuntimeError("Cannot double-take a baton") -# self._hasTaker = True - -# self._settle() - -# def _settle(self): - - +def runThread(wt: WorkerThread): + wt.run() class WorkerThread: - def __init__(self, pool, tid, dbState): # note: main thread context! - self._curStep = -1 + def __init__(self, pool: SteppingThreadPool, tid, dbState, + tc: ThreadCoordinator, + # te: TaskExecutor, + ): # note: main thread context! + # self._curStep = -1 self._pool = pool self._tid = tid self._dbState = dbState + self._tc = tc # self.threadIdent = threading.get_ident() self._thread = threading.Thread(target=runThread, args=(self,)) self._stepGate = threading.Event() # Let us have a DB connection of our own - if ( gConfig.per_thread_db_connection ): - self._dbConn = DbConn() + if ( gConfig.per_thread_db_connection ): # type: ignore + self._dbConn = DbConn() + + def getTaskExecutor(self): + return self._tc.getTaskExecutor() def start(self): self._thread.start() # AFTER the thread is recorded - def run(self): + def run(self): # initialization after thread starts, in the thread context # self.isSleeping = False logger.info("Starting to run thread: {}".format(self._tid)) - if ( gConfig.per_thread_db_connection ): + if ( gConfig.per_thread_db_connection ): # type: ignore self._dbConn.open() - # self._dbConn.resetDb() - - while self._curStep < self._pool.maxSteps: - # stepNo = self.pool.waitForStep() # Step to run - self.crossStepGate() # self.curStep will get incremented - self.doWork() + self._doTaskLoop() + # clean up - if ( gConfig.per_thread_db_connection ): + if ( gConfig.per_thread_db_connection ): # type: ignore self._dbConn.close() + def _doTaskLoop(self) : + # while self._curStep < self._pool.maxSteps: + # tc = ThreadCoordinator(None) + while True: + self._tc.crossStepBarrier() # shared barrier first, INCLUDING the last one + logger.debug("Thread task loop exited barrier...") + self.crossStepGate() # then per-thread gate, after being tapped + logger.debug("Thread task loop exited step gate...") + if not self._tc.isRunning(): + break + + task = self._tc.fetchTask() + task.execute(self) + def verifyThreadSelf(self): # ensure we are called by this own thread if ( threading.get_ident() != self._thread.ident ): raise RuntimeError("Unexpectly called from other threads") @@ -109,53 +105,25 @@ class WorkerThread: if ( not self._thread.is_alive() ): raise RuntimeError("Unexpected dead thread") - # def verifyIsSleeping(self, isSleeping): - # if ( isSleeping != self.isSleeping ): - # raise RuntimeError("Unexpected thread sleep status") - # A gate is different from a barrier in that a thread needs to be "tapped" def crossStepGate(self): self.verifyThreadAlive() self.verifyThreadSelf() # only allowed by ourselves - # self.verifyIsSleeping(False) # has to be awake - - # logger.debug("Worker thread {} about to cross pool barrier".format(self._tid)) - # self.isSleeping = True # TODO: maybe too early? - self._pool.crossPoolBarrier() # wait for all other threads # Wait again at the "gate", waiting to be "tapped" # logger.debug("Worker thread {} about to cross the step gate".format(self._tid)) self._stepGate.wait() self._stepGate.clear() - # logger.debug("Worker thread {} woke up".format(self._tid)) - # Someone will wake us up here - self._curStep += 1 # off to a new step... + # self._curStep += 1 # off to a new step... def tapStepGate(self): # give it a tap, release the thread waiting there self.verifyThreadAlive() self.verifyThreadMain() # only allowed for main thread - # self.verifyIsSleeping(True) # has to be sleeping - + logger.debug("Tapping worker thread {}".format(self._tid)) - # self.stepGate.acquire() - # logger.debug("Tapping worker thread {}, lock acquired".format(self.tid)) - self._stepGate.set() # wake up! - # logger.debug("Tapping worker thread {}, notified!".format(self.tid)) - # self.isSleeping = False # No race condition for sure - # self.stepGate.release() # this finishes before .wait() can return - # logger.debug("Tapping worker thread {}, lock released".format(self.tid)) - time.sleep(0) # let the released thread run a bit, IMPORTANT, do it after release - - def doWork(self): - self.logInfo("Thread starting an execution") - self._pool.dispatcher.doWork(self) - - def logInfo(self, msg): - logger.info(" T[{}.{}]: ".format(self._curStep, self._tid) + msg) - - def logDebug(self, msg): - logger.debug(" T[{}.{}]: ".format(self._curStep, self._tid) + msg) + self._stepGate.set() # wake up! + time.sleep(0) # let the released thread run a bit def execSql(self, sql): if ( gConfig.per_thread_db_connection ): @@ -163,6 +131,78 @@ class WorkerThread: else: return self._dbState.getDbConn().execSql(sql) +class ThreadCoordinator: + def __init__(self, pool, wd: WorkDispatcher): + self._curStep = -1 # first step is 0 + self._pool = pool + self._wd = wd + self._te = None # prepare for every new step + + self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads + + def getTaskExecutor(self): + return self._te + + def crossStepBarrier(self): + self._stepBarrier.wait() + + def run(self, dbState): + self._pool.createAndStartThreads(dbState, self) + + # Coordinate all threads step by step + self._curStep = -1 # not started yet + maxSteps = gConfig.max_steps # type: ignore + while(self._curStep < maxSteps): + print(".", end="", flush=True) + logger.debug("Main thread going to sleep") + + # Now ready to enter a step + self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate + self._stepBarrier.reset() # Other worker threads should now be at the "gate" + + # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" + logger.info("<-- Step {} finished".format(self._curStep)) + self._curStep += 1 # we are about to get into next step. TODO: race condition here! + logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep + + self._te = TaskExecutor(self._curStep) + + logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep + self.tapAllThreads() + + logger.debug("Main thread ready to finish up...") + self.crossStepBarrier() # Cross it one last time, after all threads finish + self._stepBarrier.reset() + logger.debug("Main thread in exclusive zone...") + self._te = None # No more executor, time to end + logger.debug("Main thread tapping all threads one last time...") + self.tapAllThreads() # Let the threads run one last time + logger.debug("Main thread joining all threads") + self._pool.joinAll() # Get all threads to finish + + logger.info("All threads finished") + print("\r\nFinished") + + def tapAllThreads(self): # in a deterministic manner + wakeSeq = [] + for i in range(self._pool.numThreads): # generate a random sequence + if Dice.throw(2) == 1 : + wakeSeq.append(i) + else: + wakeSeq.insert(0, i) + logger.info("Waking up threads: {}".format(str(wakeSeq))) + # TODO: set dice seed to a deterministic value + for i in wakeSeq: + self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?! + time.sleep(0) # yield + + def isRunning(self): + return self._te != None + + def fetchTask(self) -> Task : + if ( not self.isRunning() ): # no task + raise RuntimeError("Cannot fetch task when not running") + return self._wd.pickTask() # We define a class to run a number of threads in locking steps. class SteppingThreadPool: @@ -171,65 +211,24 @@ class SteppingThreadPool: self.maxSteps = maxSteps self.funcSequencer = funcSequencer # Internal class variables - self.dispatcher = WorkDispatcher(self, dbState) + self.dispatcher = WorkDispatcher(dbState) self.curStep = 0 self.threadList = [] # self.stepGate = threading.Condition() # Gate to hold/sync all threads # self.numWaitingThreads = 0 - # Thread coordination - self._lock = threading.RLock() # lock to control access (e.g. even reading it is dangerous) - self._poolBarrier = threading.Barrier(numThreads + 1) # do nothing before crossing this, except main thread - + # starting to run all the threads, in locking steps - def run(self): + def createAndStartThreads(self, dbState, tc: ThreadCoordinator): for tid in range(0, self.numThreads): # Create the threads - workerThread = WorkerThread(self, tid, dbState) + workerThread = WorkerThread(self, tid, dbState, tc) self.threadList.append(workerThread) workerThread.start() # start, but should block immediately before step 0 - # Coordinate all threads step by step - self.curStep = -1 # not started yet - while(self.curStep < self.maxSteps): - print(".", end="", flush=True) - logger.debug("Main thread going to sleep") - - # Now ready to enter a step - self.crossPoolBarrier() # let other threads go past the pool barrier, but wait at the thread gate - self._poolBarrier.reset() # Other worker threads should now be at the "gate" - - # Rare chance, when all threads should be blocked at the "step gate" for each thread - logger.info("<-- Step {} finished".format(self.curStep)) - self.curStep += 1 # we are about to get into next step. TODO: race condition here! - logger.debug(" ") # line break - logger.debug("--> Step {} starts with main thread waking up".format(self.curStep)) # Now not all threads had time to go to sleep - - logger.debug("Main thread waking up at step {}, tapping worker threads".format(self.curStep)) # Now not all threads had time to go to sleep - self.tapAllThreads() - - # The threads will run through many steps + def joinAll(self): for workerThread in self.threadList: - workerThread._thread.join() # slight hack, accessing members - - logger.info("All threads finished") - print("") - print("Finished") - - def crossPoolBarrier(self): - self._poolBarrier.wait() - - def tapAllThreads(self): # in a deterministic manner - wakeSeq = [] - for i in range(self.numThreads): # generate a random sequence - if Dice.throw(2) == 1 : - wakeSeq.append(i) - else: - wakeSeq.insert(0, i) - logger.info("Waking up threads: {}".format(str(wakeSeq))) - # TODO: set dice seed to a deterministic value - for i in wakeSeq: - self.threadList[i].tapStepGate() - time.sleep(0) # yield + logger.debug("Joining thread...") + workerThread._thread.join() # A queue of continguous POSITIVE integers class LinearQueue(): @@ -404,51 +403,67 @@ class DbState(): def cleanUp(self): self._dbConn.close() -# A task is a long-living entity, carrying out short-lived "executions" for threads +class TaskExecutor(): + def __init__(self, curStep): + self._curStep = curStep + + def execute(self, task, wt: WorkerThread): # execute a task on a thread + task.execute(self, wt) + + def logInfo(self, msg): + logger.info(" T[{}.x]: ".format(self._curStep) + msg) + + def logDebug(self, msg): + logger.debug(" T[{}.x]: ".format(self._curStep) + msg) + + class Task(): def __init__(self, dbState): self.dbState = dbState - def _executeInternal(self, wt): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): raise RuntimeError("To be implemeted by child classes") - def execute(self, workerThread): - self._executeInternal(workerThread) # TODO: no return value? - workerThread.logDebug("[X] task execution completed") + def execute(self, wt: WorkerThread): + wt.verifyThreadSelf() + + te = wt.getTaskExecutor() + self._executeInternal(te, wt) # TODO: no return value? + te.logDebug("[X] task execution completed") def execSql(self, sql): return self.dbState.execute(sql) class CreateTableTask(Task): - def _executeInternal(self, wt): - tIndex = dbState.addTable() - wt.logDebug("Creating a table {} ...".format(tIndex)) + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tIndex = self.dbState.addTable() + te.logDebug("Creating a table {} ...".format(tIndex)) wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) - wt.logDebug("Table {} created.".format(tIndex)) - dbState.releaseTable(tIndex) + te.logDebug("Table {} created.".format(tIndex)) + self.dbState.releaseTable(tIndex) class DropTableTask(Task): - def _executeInternal(self, wt): - tableName = dbState.getTableNameToDelete() + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tableName = self.dbState.getTableNameToDelete() if ( not tableName ): # May be "False" - wt.logInfo("Cannot generate a table to delete, skipping...") + te.logInfo("Cannot generate a table to delete, skipping...") return - wt.logInfo("Dropping a table db.{} ...".format(tableName)) + te.logInfo("Dropping a table db.{} ...".format(tableName)) wt.execSql("drop table db.{}".format(tableName)) class AddDataTask(Task): - def _executeInternal(self, wt): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self.dbState - wt.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) + te.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) tIndex = ds.pickAndAllocateTable() if ( tIndex == None ): - wt.logInfo("No table found to add data, skipping...") + te.logInfo("No table found to add data, skipping...") return sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - wt.logDebug("Executing SQL: {}".format(sql)) + te.logDebug("Executing SQL: {}".format(sql)) wt.execSql(sql) ds.releaseTable(tIndex) - wt.logDebug("Finished adding data") + te.logDebug("Finished adding data") # Deterministic random number generator class Dice(): @@ -484,8 +499,7 @@ class Dice(): # Anyone needing to carry out work should simply come here class WorkDispatcher(): - def __init__(self, pool, dbState): - self.pool = pool + def __init__(self, dbState): # self.totalNumMethods = 2 self.tasks = [ CreateTableTask(dbState), @@ -499,31 +513,45 @@ class WorkDispatcher(): # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) return dRes - def doWork(self, workerThread): + def pickTask(self): dice = self.throwDice() - task = self.tasks[dice] + return self.tasks[dice] + + def doWork(self, workerThread): + task = self.pickTask() task.execute(workerThread) -if __name__ == "__main__": +def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html parser = argparse.ArgumentParser(description='TDengine Auto Crash Generator') parser.add_argument('-p', '--per-thread-db-connection', action='store_true', help='Use a single shared db connection (default: false)') parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, + help='Maximum number of steps to run (default: 100)') + parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, + help='Number of threads to run (default: 10)') + global gConfig gConfig = parser.parse_args() + global logger logger = logging.getLogger('myApp') if ( gConfig.debug ): logger.setLevel(logging.DEBUG) # default seems to be INFO ch = logging.StreamHandler() logger.addHandler(ch) - Dice.seed(0) # initial seeding of dice dbState = DbState() - threadPool = SteppingThreadPool(dbState, 5, 500, 0) - threadPool.run() - logger.info("Finished running thread pool") + Dice.seed(0) # initial seeding of dice + tc = ThreadCoordinator( + SteppingThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), + WorkDispatcher(dbState) + ) + tc.run(dbState) dbState.cleanUp() - + logger.info("Finished running thread pool") + +if __name__ == "__main__": + main() From f08e9b082d3eb4f8198f8f0ac83a3364b63e559d Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 5 May 2020 00:20:29 -0700 Subject: [PATCH 12/17] now able to fan out conflicting tasks, and detect incorrect results --- tests/pytest/crash_gen.py | 166 ++++++++++++++++++++++++++++++-------- 1 file changed, 134 insertions(+), 32 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index a5baf8577b..b0a13b943b 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -20,12 +20,15 @@ if sys.version_info[0] < 3: import getopt import argparse +import copy import threading import random import logging import datetime +from typing import List + from util.log import * from util.dnodes import * from util.cases import * @@ -42,14 +45,13 @@ def runThread(wt: WorkerThread): wt.run() class WorkerThread: - def __init__(self, pool: SteppingThreadPool, tid, dbState, + def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator, # te: TaskExecutor, ): # note: main thread context! # self._curStep = -1 self._pool = pool - self._tid = tid - self._dbState = dbState + self._tid = tid self._tc = tc # self.threadIdent = threading.get_ident() self._thread = threading.Thread(target=runThread, args=(self,)) @@ -82,16 +84,18 @@ class WorkerThread: def _doTaskLoop(self) : # while self._curStep < self._pool.maxSteps: # tc = ThreadCoordinator(None) - while True: - self._tc.crossStepBarrier() # shared barrier first, INCLUDING the last one + while True: + tc = self._tc # Thread Coordinator, the overall master + tc.crossStepBarrier() # shared barrier first, INCLUDING the last one logger.debug("Thread task loop exited barrier...") self.crossStepGate() # then per-thread gate, after being tapped logger.debug("Thread task loop exited step gate...") if not self._tc.isRunning(): break - task = self._tc.fetchTask() + task = tc.fetchTask() task.execute(self) + tc.saveExecutedTask(task) def verifyThreadSelf(self): # ensure we are called by this own thread if ( threading.get_ident() != self._thread.ident ): @@ -129,25 +133,31 @@ class WorkerThread: if ( gConfig.per_thread_db_connection ): return self._dbConn.execSql(sql) else: - return self._dbState.getDbConn().execSql(sql) + return self._tc.getDbState.getDbConn().execSql(sql) class ThreadCoordinator: - def __init__(self, pool, wd: WorkDispatcher): + def __init__(self, pool, wd: WorkDispatcher, dbState): self._curStep = -1 # first step is 0 self._pool = pool self._wd = wd self._te = None # prepare for every new step + self._dbState = dbState + self._executedTasks: List[Task] = [] # in a given step + self._lock = threading.RLock() # sync access for a few things self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads def getTaskExecutor(self): return self._te + def getDbState(self) -> DbState : + return self._dbState + def crossStepBarrier(self): self._stepBarrier.wait() - def run(self, dbState): - self._pool.createAndStartThreads(dbState, self) + def run(self): + self._pool.createAndStartThreads(self) # Coordinate all threads step by step self._curStep = -1 # not started yet @@ -161,10 +171,14 @@ class ThreadCoordinator: self._stepBarrier.reset() # Other worker threads should now be at the "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" + self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + + # Get ready for next step logger.info("<-- Step {} finished".format(self._curStep)) self._curStep += 1 # we are about to get into next step. TODO: race condition here! logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep + # A new TE for the new step self._te = TaskExecutor(self._curStep) logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep @@ -202,10 +216,19 @@ class ThreadCoordinator: def fetchTask(self) -> Task : if ( not self.isRunning() ): # no task raise RuntimeError("Cannot fetch task when not running") - return self._wd.pickTask() + # return self._wd.pickTask() + # Alternatively, let's ask the DbState for the appropriate task + dbState = self.getDbState() + tasks = dbState.getTasksAtState() + i = Dice.throw(len(tasks)) + return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. + + def saveExecutedTask(self, task): + with self._lock: + self._executedTasks.append(task) # We define a class to run a number of threads in locking steps. -class SteppingThreadPool: +class ThreadPool: def __init__(self, dbState, numThreads, maxSteps, funcSequencer): self.numThreads = numThreads self.maxSteps = maxSteps @@ -215,13 +238,12 @@ class SteppingThreadPool: self.curStep = 0 self.threadList = [] # self.stepGate = threading.Condition() # Gate to hold/sync all threads - # self.numWaitingThreads = 0 - + # self.numWaitingThreads = 0 # starting to run all the threads, in locking steps - def createAndStartThreads(self, dbState, tc: ThreadCoordinator): + def createAndStartThreads(self, tc: ThreadCoordinator): for tid in range(0, self.numThreads): # Create the threads - workerThread = WorkerThread(self, tid, dbState, tc) + workerThread = WorkerThread(self, tid, tc) self.threadList.append(workerThread) workerThread.start() # start, but should block immediately before step 0 @@ -263,9 +285,6 @@ class LinearQueue(): if ( index in self.inUse ): return False - # if ( index in self.inUse ): - # self.inUse.remove(index) # TODO: what about discard? - self.firstIndex += 1 return index @@ -337,7 +356,8 @@ class DbConn: # self._tdSql.prepare() # Recreate database, etc. self._cursor.execute('drop database if exists db') - self._cursor.execute('create database db') + logger.debug("Resetting DB, dropped database") + # self._cursor.execute('create database db') # self._cursor.execute('use db') # tdSql.execute('show databases') @@ -355,16 +375,24 @@ class DbConn: # State of the database as we believe it to be class DbState(): + STATE_INVALID = -1 + STATE_EMPTY = 1 # nothing there, no even a DB + STATE_DB_ONLY = 2 # we have a DB, but nothing else + STATE_TABLE_ONLY = 3 # we have a table, but totally empty + STATE_HAS_DATA = 4 # we have some data in the table + def __init__(self): self.tableNumQueue = LinearQueue() self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick self._lastInt = 0 # next one is initial integer self._lock = threading.RLock() - + self._state = self.STATE_INVALID + # self.openDbServerConnection() self._dbConn = DbConn() self._dbConn.open() self._dbConn.resetDb() # drop and recreate DB + self._state = self.STATE_EMPTY # initial state, the result of above def getDbConn(self): return self._dbConn @@ -403,12 +431,63 @@ class DbState(): def cleanUp(self): self._dbConn.close() + def getTasksAtState(self): + if ( self._state == self.STATE_EMPTY ): + return [CreateDbTask(self), CreateTableTask(self)] + elif ( self._state == self.STATE_DB_ONLY ): + return [DeleteDbTask(self), CreateTableTask(self), AddDataTask(self)] + else: + raise RuntimeError("Unexpected DbState state: {}".format(self._state)) + + def transition(self, tasks): + if ( len(tasks) == 0 ): # before 1st step, or otherwise empty + return # do nothing + if ( self._state == self.STATE_EMPTY ): + self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class + self.assertIfExistThenSuccess(tasks, CreateDbTask) + self.assertAtMostOneSuccess(tasks, CreateTableTask) + if ( self.hasSuccess(tasks, CreateDbTask) ): + self._state = self.STATE_DB_ONLY + if ( self.hasSuccess(tasks, CreateTableTask) ): + self._state = self.STATE_TABLE_ONLY + else: + raise RuntimeError("Unexpected DbState state: {}".format(self._state)) + logger.debug("New DB state is: {}".format(self._state)) + + def assertAtMostOneSuccess(self, tasks, cls): + sCnt = 0 + for task in tasks : + if not isinstance(task, cls): + continue + if task.isSuccess(): + sCnt += 1 + if ( sCnt >= 2 ): + raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) + + def assertIfExistThenSuccess(self, tasks, cls): + sCnt = 0 + for task in tasks : + if not isinstance(task, cls): + continue + if task.isSuccess(): + sCnt += 1 + if ( sCnt <= 0 ): + raise RuntimeError("Unexpected zero success for task: {}".format(cls)) + + def hasSuccess(self, tasks, cls): + for task in tasks : + if not isinstance(task, cls): + continue + if task.isSuccess(): + return True + return False + class TaskExecutor(): def __init__(self, curStep): self._curStep = curStep - def execute(self, task, wt: WorkerThread): # execute a task on a thread - task.execute(self, wt) + def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread + task.execute(wt) def logInfo(self, msg): logger.info(" T[{}.x]: ".format(self._curStep) + msg) @@ -416,10 +495,13 @@ class TaskExecutor(): def logDebug(self, msg): logger.debug(" T[{}.x]: ".format(self._curStep) + msg) - class Task(): def __init__(self, dbState): self.dbState = dbState + self._err = None + + def isSuccess(self): + return self._err == None def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): raise RuntimeError("To be implemeted by child classes") @@ -428,12 +510,31 @@ class Task(): wt.verifyThreadSelf() te = wt.getTaskExecutor() - self._executeInternal(te, wt) # TODO: no return value? + te.logDebug("[-] executing task {}...".format(self.__class__.__name__)) + + self._err = None + try: + self._executeInternal(te, wt) # TODO: no return value? + except taos.error.ProgrammingError as err: + te.logDebug("[=]Taos Execution exception: {0}".format(err)) + self._err = err + except: + te.logDebug("[=]Unexpected exception") + raise + te.logDebug("[X] task execution completed") def execSql(self, sql): return self.dbState.execute(sql) +class CreateDbTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + wt.execSql("create database db") + +class DeleteDbTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + wt.execSql("drop database db") + class CreateTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tIndex = self.dbState.addTable() @@ -487,14 +588,14 @@ class Dice(): raise RuntimeError("System RNG is not deterministic") @classmethod - def throw(cls, max): # get 0 to max-1 - return cls.throwRange(0, max) + def throw(cls, stop): # get 0 to stop-1 + return cls.throwRange(0, stop) @classmethod - def throwRange(cls, min, max): # up to max-1 + def throwRange(cls, start, stop): # up to stop-1 if ( not cls.seeded ): raise RuntimeError("Cannot throw dice before seeding it") - return random.randrange(min, max) + return random.randrange(start, stop) # Anyone needing to carry out work should simply come here @@ -546,10 +647,11 @@ def main(): dbState = DbState() Dice.seed(0) # initial seeding of dice tc = ThreadCoordinator( - SteppingThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), - WorkDispatcher(dbState) + ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), + WorkDispatcher(dbState), + dbState ) - tc.run(dbState) + tc.run() dbState.cleanUp() logger.info("Finished running thread pool") From a3671929fb6b14a0846688363400ac379dec99e4 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Fri, 8 May 2020 00:36:00 -0700 Subject: [PATCH 13/17] successfully discovered TD-250 crash bug, with params: -p -d -s 3 -t 3 --- tests/pytest/crash_gen.py | 117 ++++++++++++++++++++++++++++++++------ 1 file changed, 99 insertions(+), 18 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index b0a13b943b..ccc90708d3 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -405,6 +405,9 @@ class DbState(): tIndex = self.tableNumQueue.push() return tIndex + def getFixedTableName(self): + return "fixed_table" + def releaseTable(self, i): # return the table back, so others can use it self.tableNumQueue.release(i) @@ -433,9 +436,13 @@ class DbState(): def getTasksAtState(self): if ( self._state == self.STATE_EMPTY ): - return [CreateDbTask(self), CreateTableTask(self)] + return [CreateDbTask(self), CreateFixedTableTask(self)] elif ( self._state == self.STATE_DB_ONLY ): - return [DeleteDbTask(self), CreateTableTask(self), AddDataTask(self)] + return [DropDbTask(self), CreateFixedTableTask(self), AddFixedDataTask(self)] + elif ( self._state == self.STATE_TABLE_ONLY ): + return [DropFixedTableTask(self), AddFixedDataTask(self)] + elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust + return [DropFixedTableTask(self), AddFixedDataTask(self)] else: raise RuntimeError("Unexpected DbState state: {}".format(self._state)) @@ -443,13 +450,58 @@ class DbState(): if ( len(tasks) == 0 ): # before 1st step, or otherwise empty return # do nothing if ( self._state == self.STATE_EMPTY ): - self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class - self.assertIfExistThenSuccess(tasks, CreateDbTask) - self.assertAtMostOneSuccess(tasks, CreateTableTask) if ( self.hasSuccess(tasks, CreateDbTask) ): + self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class self._state = self.STATE_DB_ONLY - if ( self.hasSuccess(tasks, CreateTableTask) ): - self._state = self.STATE_TABLE_ONLY + if ( self.hasSuccess(tasks, CreateFixedTableTask )): + self._state = self.STATE_TABLE_ONLY + # else: # no successful table creation, not much we can say, as it is step 2 + else: # did not create db + self.assertNoTask(tasks, CreateDbTask) # because we did not have such task + # self.assertNoSuccess(tasks, CreateDbTask) # not necessary, since we just verified no such task + self.assertNoSuccess(tasks, CreateFixedTableTask) + + elif ( self._state == self.STATE_DB_ONLY ): + self.assertAtMostOneSuccess(tasks, DropDbTask) + self.assertIfExistThenSuccess(tasks, DropDbTask) + self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) + # Nothing to be said about adding data task + if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB + # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess + self.assertAtMostOneSuccess(tasks, DropDbTask) + self._state = self.STATE_EMPTY + elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success + # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table + self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful + self.assertNoTask(tasks, DropDbTask) # should have have tried + if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet + # can't say there's add-data attempts, since they may all fail + self._state = self.STATE_TABLE_ONLY + else: + self._state = self.STATE_HAS_DATA + else: # no success in dropping db tasks, no success in create fixed table, not acceptable + raise RuntimeError("Unexpected no-success scenario") + + elif ( self._state == self.STATE_TABLE_ONLY ): + if ( self.hasSuccess(tasks, DropFixedTableTask) ): + self.assertAtMostOneSuccess(tasks, DropFixedTableTask) + self._state = self.STATE_DB_ONLY + elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table + self.assertNoTask(tasks, DropFixedTableTask) + self._state = self.STATE_HAS_DATA + else: # did not drop table, did not insert data, that is impossible + raise RuntimeError("Unexpected no-success scenarios") + + elif ( self._state == self.STATE_TABLE_ONLY ): # Same as above, TODO: adjust + if ( self.hasSuccess(tasks, DropFixedTableTask) ): + self.assertAtMostOneSuccess(tasks, DropFixedTableTask) + self._state = self.STATE_DB_ONLY + elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table + self.assertNoTask(tasks, DropFixedTableTask) + self._state = self.STATE_HAS_DATA + else: # did not drop table, did not insert data, that is impossible + raise RuntimeError("Unexpected no-success scenarios") + else: raise RuntimeError("Unexpected DbState state: {}".format(self._state)) logger.debug("New DB state is: {}".format(self._state)) @@ -466,14 +518,27 @@ class DbState(): def assertIfExistThenSuccess(self, tasks, cls): sCnt = 0 + exists = False for task in tasks : if not isinstance(task, cls): continue + exists = True # we have a valid instance if task.isSuccess(): sCnt += 1 - if ( sCnt <= 0 ): + if ( exists and sCnt <= 0 ): raise RuntimeError("Unexpected zero success for task: {}".format(cls)) + def assertNoTask(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + raise RuntimeError("Unexpected task: {}".format(cls)) + + def assertNoSuccess(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + if task.isSuccess(): + raise RuntimeError("Unexpected successful task: {}".format(cls)) + def hasSuccess(self, tasks, cls): for task in tasks : if not isinstance(task, cls): @@ -496,15 +561,15 @@ class TaskExecutor(): logger.debug(" T[{}.x]: ".format(self._curStep) + msg) class Task(): - def __init__(self, dbState): - self.dbState = dbState + def __init__(self, dbState: DbState): + self._dbState = dbState self._err = None def isSuccess(self): return self._err == None def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - raise RuntimeError("To be implemeted by child classes") + raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__)) def execute(self, wt: WorkerThread): wt.verifyThreadSelf() @@ -522,39 +587,49 @@ class Task(): te.logDebug("[=]Unexpected exception") raise - te.logDebug("[X] task execution completed") + te.logDebug("[X] task execution completed, status: {}".format(self.isSuccess())) def execSql(self, sql): - return self.dbState.execute(sql) + return self._dbState.execute(sql) class CreateDbTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("create database db") -class DeleteDbTask(Task): +class DropDbTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("drop database db") class CreateTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tIndex = self.dbState.addTable() + tIndex = self._dbState.addTable() te.logDebug("Creating a table {} ...".format(tIndex)) wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) te.logDebug("Table {} created.".format(tIndex)) - self.dbState.releaseTable(tIndex) + self._dbState.releaseTable(tIndex) + +class CreateFixedTableTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbState.getFixedTableName() + wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName)) class DropTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tableName = self.dbState.getTableNameToDelete() + tableName = self._dbState.getTableNameToDelete() if ( not tableName ): # May be "False" te.logInfo("Cannot generate a table to delete, skipping...") return te.logInfo("Dropping a table db.{} ...".format(tableName)) wt.execSql("drop table db.{}".format(tableName)) + +class DropFixedTableTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbState.getFixedTableName() + wt.execSql("drop table db.{}".format(tblName)) class AddDataTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self.dbState + ds = self._dbState te.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) tIndex = ds.pickAndAllocateTable() if ( tIndex == None ): @@ -566,6 +641,12 @@ class AddDataTask(Task): ds.releaseTable(tIndex) te.logDebug("Finished adding data") +class AddFixedDataTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + ds = self._dbState + sql = "insert into db.table_{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt()) + wt.execSql(sql) + # Deterministic random number generator class Dice(): seeded = False # static, uninitialized From 6623cb6395209d023d8633678408288ebe0bf2d4 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Fri, 8 May 2020 16:46:34 -0700 Subject: [PATCH 14/17] Discovered TD-255 crash with params: -p -s 50 -t 10 --- .gitignore | 1 + tests/pytest/crash_gen.py | 74 ++++++++++++++++++++++++++++++--------- 2 files changed, 58 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index 806663987a..9772284ef1 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ debs/ rpms/ mac/ *.pyc +.mypy_cache *.tmp *.swp src/connector/nodejs/node_modules/ diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index ccc90708d3..431de7c56a 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -61,6 +61,13 @@ class WorkerThread: if ( gConfig.per_thread_db_connection ): # type: ignore self._dbConn = DbConn() + def logDebug(self, msg): + logger.info(" t[{}] {}".format(self._tid, msg)) + + def logInfo(self, msg): + logger.info(" t[{}] {}".format(self._tid, msg)) + + def getTaskExecutor(self): return self._tc.getTaskExecutor() @@ -172,6 +179,7 @@ class ThreadCoordinator: # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + self.resetExecutedTasks() # clear the tasks after we are done # Get ready for next step logger.info("<-- Step {} finished".format(self._curStep)) @@ -221,7 +229,11 @@ class ThreadCoordinator: dbState = self.getDbState() tasks = dbState.getTasksAtState() i = Dice.throw(len(tasks)) - return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. + # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. + return tasks[i].clone() + + def resetExecutedTasks(self): + self._executedTasks = [] # should be under single thread def saveExecutedTask(self, task): with self._lock: @@ -512,6 +524,7 @@ class DbState(): if not isinstance(task, cls): continue if task.isSuccess(): + task.logDebug("Task success found") sCnt += 1 if ( sCnt >= 2 ): raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) @@ -551,43 +564,70 @@ class TaskExecutor(): def __init__(self, curStep): self._curStep = curStep + def getCurStep(self): + return self._curStep + def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread task.execute(wt) - def logInfo(self, msg): - logger.info(" T[{}.x]: ".format(self._curStep) + msg) + # def logInfo(self, msg): + # logger.info(" T[{}.x]: ".format(self._curStep) + msg) - def logDebug(self, msg): - logger.debug(" T[{}.x]: ".format(self._curStep) + msg) + # def logDebug(self, msg): + # logger.debug(" T[{}.x]: ".format(self._curStep) + msg) class Task(): + taskSn = 100 + + @classmethod + def allocTaskNum(cls): + cls.taskSn += 1 + return cls.taskSn + def __init__(self, dbState: DbState): self._dbState = dbState + self._workerThread = None self._err = None + self._curStep = None + + # Assign an incremental task serial number + self._taskNum = self.allocTaskNum() def isSuccess(self): return self._err == None + def clone(self): + newTask = self.__class__(self._dbState) + return newTask + + def logDebug(self, msg): + self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + + def logInfo(self, msg): + self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__)) def execute(self, wt: WorkerThread): wt.verifyThreadSelf() + self._workerThread = wt # type: ignore te = wt.getTaskExecutor() - te.logDebug("[-] executing task {}...".format(self.__class__.__name__)) + self._curStep = te.getCurStep() + self.logDebug("[-] executing task {}...".format(self.__class__.__name__)) self._err = None try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - te.logDebug("[=]Taos Execution exception: {0}".format(err)) + self.logDebug("[=]Taos Execution exception: {0}".format(err)) self._err = err except: - te.logDebug("[=]Unexpected exception") + self.logDebug("[=]Unexpected exception") raise - te.logDebug("[X] task execution completed, status: {}".format(self.isSuccess())) + self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) def execSql(self, sql): return self._dbState.execute(sql) @@ -603,9 +643,9 @@ class DropDbTask(Task): class CreateTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tIndex = self._dbState.addTable() - te.logDebug("Creating a table {} ...".format(tIndex)) + self.logDebug("Creating a table {} ...".format(tIndex)) wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) - te.logDebug("Table {} created.".format(tIndex)) + self.logDebug("Table {} created.".format(tIndex)) self._dbState.releaseTable(tIndex) class CreateFixedTableTask(Task): @@ -617,9 +657,9 @@ class DropTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tableName = self._dbState.getTableNameToDelete() if ( not tableName ): # May be "False" - te.logInfo("Cannot generate a table to delete, skipping...") + self.logInfo("Cannot generate a table to delete, skipping...") return - te.logInfo("Dropping a table db.{} ...".format(tableName)) + self.logInfo("Dropping a table db.{} ...".format(tableName)) wt.execSql("drop table db.{}".format(tableName)) class DropFixedTableTask(Task): @@ -630,16 +670,16 @@ class DropFixedTableTask(Task): class AddDataTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbState - te.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) + self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) tIndex = ds.pickAndAllocateTable() if ( tIndex == None ): - te.logInfo("No table found to add data, skipping...") + self.logInfo("No table found to add data, skipping...") return sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - te.logDebug("Executing SQL: {}".format(sql)) + self.logDebug("Executing SQL: {}".format(sql)) wt.execSql(sql) ds.releaseTable(tIndex) - te.logDebug("Finished adding data") + self.logDebug("Finished adding data") class AddFixedDataTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): From dcb1155481a5c28bfc40ea14ccae450c2e78e17e Mon Sep 17 00:00:00 2001 From: Steven Li Date: Fri, 8 May 2020 17:40:43 -0700 Subject: [PATCH 15/17] Discovered TD-256 with params: -p -d -t 10 -s 20 --- tests/pytest/crash_gen.py | 50 ++++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 431de7c56a..6a724709e2 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -136,11 +136,17 @@ class WorkerThread: self._stepGate.set() # wake up! time.sleep(0) # let the released thread run a bit - def execSql(self, sql): + def execSql(self, sql): # not "execute", since we are out side the DB context if ( gConfig.per_thread_db_connection ): - return self._dbConn.execSql(sql) + return self._dbConn.execute(sql) else: - return self._tc.getDbState.getDbConn().execSql(sql) + return self._tc.getDbState.getDbConn().execute(sql) + + def querySql(self, sql): # not "execute", since we are out side the DB context + if ( gConfig.per_thread_db_connection ): + return self._dbConn.query(sql) + else: + return self._tc.getDbState.getDbConn().query(sql) class ThreadCoordinator: def __init__(self, pool, wd: WorkDispatcher, dbState): @@ -380,10 +386,16 @@ class DbConn: self._tdSql.close() self.isOpen = False - def execSql(self, sql): + def execute(self, sql): + if ( not self.isOpen ): + raise RuntimeError("Cannot execute database commands until connection is open") + return self._tdSql.execute(sql) + + def query(self, sql) -> int : # return number of rows retrieved if ( not self.isOpen ): raise RuntimeError("Cannot query database until connection is open") - return self._tdSql.execute(sql) + return self._tdSql.query(sql) + # State of the database as we believe it to be class DbState(): @@ -441,27 +453,36 @@ class DbState(): return "table_{}".format(tblNum) def execSql(self, sql): # using the main DB connection - return self._dbConn.execSql(sql) + return self._dbConn.execute(sql) def cleanUp(self): self._dbConn.close() def getTasksAtState(self): + tasks = [] + tasks.append(ReadFixedDataTask(self)) # always if ( self._state == self.STATE_EMPTY ): - return [CreateDbTask(self), CreateFixedTableTask(self)] + tasks.append(CreateDbTask(self)) + tasks.append(CreateFixedTableTask(self)) elif ( self._state == self.STATE_DB_ONLY ): - return [DropDbTask(self), CreateFixedTableTask(self), AddFixedDataTask(self)] + tasks.append(DropDbTask(self)) + tasks.append(CreateFixedTableTask(self)) + tasks.append(AddFixedDataTask(self)) elif ( self._state == self.STATE_TABLE_ONLY ): - return [DropFixedTableTask(self), AddFixedDataTask(self)] + tasks.append(DropFixedTableTask(self)) + tasks.append(AddFixedDataTask(self)) elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust - return [DropFixedTableTask(self), AddFixedDataTask(self)] + tasks.append(DropFixedTableTask(self)) + tasks.append(AddFixedDataTask(self)) else: raise RuntimeError("Unexpected DbState state: {}".format(self._state)) + return tasks def transition(self, tasks): if ( len(tasks) == 0 ): # before 1st step, or otherwise empty return # do nothing if ( self._state == self.STATE_EMPTY ): + # self.assertNoSuccess(tasks, ReadFixedDataTask) # some read may be successful, since we might be creating a table if ( self.hasSuccess(tasks, CreateDbTask) ): self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class self._state = self.STATE_DB_ONLY @@ -504,7 +525,7 @@ class DbState(): else: # did not drop table, did not insert data, that is impossible raise RuntimeError("Unexpected no-success scenarios") - elif ( self._state == self.STATE_TABLE_ONLY ): # Same as above, TODO: adjust + elif ( self._state == self.STATE_HAS_DATA ): # Same as above, TODO: adjust if ( self.hasSuccess(tasks, DropFixedTableTask) ): self.assertAtMostOneSuccess(tasks, DropFixedTableTask) self._state = self.STATE_DB_ONLY @@ -589,6 +610,7 @@ class Task(): self._workerThread = None self._err = None self._curStep = None + self._numRows = None # Number of rows affected # Assign an incremental task serial number self._taskNum = self.allocTaskNum() @@ -653,6 +675,12 @@ class CreateFixedTableTask(Task): tblName = self._dbState.getFixedTableName() wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName)) +class ReadFixedDataTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbState.getFixedTableName() + self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later + # tdSql.query(" cars where tbname in ('carzero', 'carone')") + class DropTableTask(Task): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): tableName = self._dbState.getTableNameToDelete() From e1ee02e326428991123ade43b10d85be2f861fe0 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 18 May 2020 17:43:37 -0700 Subject: [PATCH 16/17] Ready to merge crash_gen into develop branch --- tests/pytest/crash_gen.py | 31 +- tests/pytest/crash_gen_0519.py | 831 +++++++++++++++++++++++++++++++++ tests/pytest/crash_gen_0519.sh | 41 ++ 3 files changed, 899 insertions(+), 4 deletions(-) create mode 100755 tests/pytest/crash_gen_0519.py create mode 100755 tests/pytest/crash_gen_0519.sh diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 6a724709e2..44c978957e 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -26,6 +26,7 @@ import threading import random import logging import datetime +import textwrap from typing import List @@ -140,13 +141,13 @@ class WorkerThread: if ( gConfig.per_thread_db_connection ): return self._dbConn.execute(sql) else: - return self._tc.getDbState.getDbConn().execute(sql) + return self._tc.getDbState().getDbConn().execute(sql) def querySql(self, sql): # not "execute", since we are out side the DB context if ( gConfig.per_thread_db_connection ): return self._dbConn.query(sql) else: - return self._tc.getDbState.getDbConn().query(sql) + return self._tc.getDbState().getDbConn().query(sql) class ThreadCoordinator: def __init__(self, pool, wd: WorkDispatcher, dbState): @@ -414,7 +415,18 @@ class DbState(): # self.openDbServerConnection() self._dbConn = DbConn() - self._dbConn.open() + try: + self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected + except taos.error.ProgrammingError as err: + # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err)) + if ( err.msg == 'disconnected' ): # cannot open DB connection + print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") + sys.exit() + else: + raise + except: + print("[=]Unexpected exception") + raise self._dbConn.resetDb() # drop and recreate DB self._state = self.STATE_EMPTY # initial state, the result of above @@ -773,7 +785,15 @@ class WorkDispatcher(): def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html - parser = argparse.ArgumentParser(description='TDengine Auto Crash Generator') + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description=textwrap.dedent('''\ + TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below) + --------------------------------------------------------------------- + 1. You build TDengine in the top level ./build directory, as described in offical docs + 2. You run the server there before this script: ./build/bin/taosd -c test/cfg + + ''')) parser.add_argument('-p', '--per-thread-db-connection', action='store_true', help='Use a single shared db connection (default: false)') parser.add_argument('-d', '--debug', action='store_true', @@ -785,6 +805,9 @@ def main(): global gConfig gConfig = parser.parse_args() + if len(sys.argv) == 1: + parser.print_help() + sys.exit() global logger logger = logging.getLogger('myApp') diff --git a/tests/pytest/crash_gen_0519.py b/tests/pytest/crash_gen_0519.py new file mode 100755 index 0000000000..44c978957e --- /dev/null +++ b/tests/pytest/crash_gen_0519.py @@ -0,0 +1,831 @@ +#!/usr/bin/python3.7 +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- +from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel + +import sys +# Require Python 3 +if sys.version_info[0] < 3: + raise Exception("Must be using Python 3") + +import getopt +import argparse +import copy + +import threading +import random +import logging +import datetime +import textwrap + +from typing import List + +from util.log import * +from util.dnodes import * +from util.cases import * +from util.sql import * + +import crash_gen +import taos + +# Global variables, tried to keep a small number. +gConfig = None # Command-line/Environment Configurations, will set a bit later +logger = None + +def runThread(wt: WorkerThread): + wt.run() + +class WorkerThread: + def __init__(self, pool: ThreadPool, tid, + tc: ThreadCoordinator, + # te: TaskExecutor, + ): # note: main thread context! + # self._curStep = -1 + self._pool = pool + self._tid = tid + self._tc = tc + # self.threadIdent = threading.get_ident() + self._thread = threading.Thread(target=runThread, args=(self,)) + self._stepGate = threading.Event() + + # Let us have a DB connection of our own + if ( gConfig.per_thread_db_connection ): # type: ignore + self._dbConn = DbConn() + + def logDebug(self, msg): + logger.info(" t[{}] {}".format(self._tid, msg)) + + def logInfo(self, msg): + logger.info(" t[{}] {}".format(self._tid, msg)) + + + def getTaskExecutor(self): + return self._tc.getTaskExecutor() + + def start(self): + self._thread.start() # AFTER the thread is recorded + + def run(self): + # initialization after thread starts, in the thread context + # self.isSleeping = False + logger.info("Starting to run thread: {}".format(self._tid)) + + if ( gConfig.per_thread_db_connection ): # type: ignore + self._dbConn.open() + + self._doTaskLoop() + + # clean up + if ( gConfig.per_thread_db_connection ): # type: ignore + self._dbConn.close() + + def _doTaskLoop(self) : + # while self._curStep < self._pool.maxSteps: + # tc = ThreadCoordinator(None) + while True: + tc = self._tc # Thread Coordinator, the overall master + tc.crossStepBarrier() # shared barrier first, INCLUDING the last one + logger.debug("Thread task loop exited barrier...") + self.crossStepGate() # then per-thread gate, after being tapped + logger.debug("Thread task loop exited step gate...") + if not self._tc.isRunning(): + break + + task = tc.fetchTask() + task.execute(self) + tc.saveExecutedTask(task) + + def verifyThreadSelf(self): # ensure we are called by this own thread + if ( threading.get_ident() != self._thread.ident ): + raise RuntimeError("Unexpectly called from other threads") + + def verifyThreadMain(self): # ensure we are called by the main thread + if ( threading.get_ident() != threading.main_thread().ident ): + raise RuntimeError("Unexpectly called from other threads") + + def verifyThreadAlive(self): + if ( not self._thread.is_alive() ): + raise RuntimeError("Unexpected dead thread") + + # A gate is different from a barrier in that a thread needs to be "tapped" + def crossStepGate(self): + self.verifyThreadAlive() + self.verifyThreadSelf() # only allowed by ourselves + + # Wait again at the "gate", waiting to be "tapped" + # logger.debug("Worker thread {} about to cross the step gate".format(self._tid)) + self._stepGate.wait() + self._stepGate.clear() + + # self._curStep += 1 # off to a new step... + + def tapStepGate(self): # give it a tap, release the thread waiting there + self.verifyThreadAlive() + self.verifyThreadMain() # only allowed for main thread + + logger.debug("Tapping worker thread {}".format(self._tid)) + self._stepGate.set() # wake up! + time.sleep(0) # let the released thread run a bit + + def execSql(self, sql): # not "execute", since we are out side the DB context + if ( gConfig.per_thread_db_connection ): + return self._dbConn.execute(sql) + else: + return self._tc.getDbState().getDbConn().execute(sql) + + def querySql(self, sql): # not "execute", since we are out side the DB context + if ( gConfig.per_thread_db_connection ): + return self._dbConn.query(sql) + else: + return self._tc.getDbState().getDbConn().query(sql) + +class ThreadCoordinator: + def __init__(self, pool, wd: WorkDispatcher, dbState): + self._curStep = -1 # first step is 0 + self._pool = pool + self._wd = wd + self._te = None # prepare for every new step + self._dbState = dbState + self._executedTasks: List[Task] = [] # in a given step + self._lock = threading.RLock() # sync access for a few things + + self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads + + def getTaskExecutor(self): + return self._te + + def getDbState(self) -> DbState : + return self._dbState + + def crossStepBarrier(self): + self._stepBarrier.wait() + + def run(self): + self._pool.createAndStartThreads(self) + + # Coordinate all threads step by step + self._curStep = -1 # not started yet + maxSteps = gConfig.max_steps # type: ignore + while(self._curStep < maxSteps): + print(".", end="", flush=True) + logger.debug("Main thread going to sleep") + + # Now ready to enter a step + self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate + self._stepBarrier.reset() # Other worker threads should now be at the "gate" + + # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" + self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + self.resetExecutedTasks() # clear the tasks after we are done + + # Get ready for next step + logger.info("<-- Step {} finished".format(self._curStep)) + self._curStep += 1 # we are about to get into next step. TODO: race condition here! + logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep + + # A new TE for the new step + self._te = TaskExecutor(self._curStep) + + logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep + self.tapAllThreads() + + logger.debug("Main thread ready to finish up...") + self.crossStepBarrier() # Cross it one last time, after all threads finish + self._stepBarrier.reset() + logger.debug("Main thread in exclusive zone...") + self._te = None # No more executor, time to end + logger.debug("Main thread tapping all threads one last time...") + self.tapAllThreads() # Let the threads run one last time + logger.debug("Main thread joining all threads") + self._pool.joinAll() # Get all threads to finish + + logger.info("All threads finished") + print("\r\nFinished") + + def tapAllThreads(self): # in a deterministic manner + wakeSeq = [] + for i in range(self._pool.numThreads): # generate a random sequence + if Dice.throw(2) == 1 : + wakeSeq.append(i) + else: + wakeSeq.insert(0, i) + logger.info("Waking up threads: {}".format(str(wakeSeq))) + # TODO: set dice seed to a deterministic value + for i in wakeSeq: + self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?! + time.sleep(0) # yield + + def isRunning(self): + return self._te != None + + def fetchTask(self) -> Task : + if ( not self.isRunning() ): # no task + raise RuntimeError("Cannot fetch task when not running") + # return self._wd.pickTask() + # Alternatively, let's ask the DbState for the appropriate task + dbState = self.getDbState() + tasks = dbState.getTasksAtState() + i = Dice.throw(len(tasks)) + # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. + return tasks[i].clone() + + def resetExecutedTasks(self): + self._executedTasks = [] # should be under single thread + + def saveExecutedTask(self, task): + with self._lock: + self._executedTasks.append(task) + +# We define a class to run a number of threads in locking steps. +class ThreadPool: + def __init__(self, dbState, numThreads, maxSteps, funcSequencer): + self.numThreads = numThreads + self.maxSteps = maxSteps + self.funcSequencer = funcSequencer + # Internal class variables + self.dispatcher = WorkDispatcher(dbState) + self.curStep = 0 + self.threadList = [] + # self.stepGate = threading.Condition() # Gate to hold/sync all threads + # self.numWaitingThreads = 0 + + # starting to run all the threads, in locking steps + def createAndStartThreads(self, tc: ThreadCoordinator): + for tid in range(0, self.numThreads): # Create the threads + workerThread = WorkerThread(self, tid, tc) + self.threadList.append(workerThread) + workerThread.start() # start, but should block immediately before step 0 + + def joinAll(self): + for workerThread in self.threadList: + logger.debug("Joining thread...") + workerThread._thread.join() + +# A queue of continguous POSITIVE integers +class LinearQueue(): + def __init__(self): + self.firstIndex = 1 # 1st ever element + self.lastIndex = 0 + self._lock = threading.RLock() # our functions may call each other + self.inUse = set() # the indexes that are in use right now + + def toText(self): + return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse) + + # Push (add new element, largest) to the tail, and mark it in use + def push(self): + with self._lock: + # if ( self.isEmpty() ): + # self.lastIndex = self.firstIndex + # return self.firstIndex + # Otherwise we have something + self.lastIndex += 1 + self.allocate(self.lastIndex) + # self.inUse.add(self.lastIndex) # mark it in use immediately + return self.lastIndex + + def pop(self): + with self._lock: + if ( self.isEmpty() ): + # raise RuntimeError("Cannot pop an empty queue") + return False # TODO: None? + + index = self.firstIndex + if ( index in self.inUse ): + return False + + self.firstIndex += 1 + return index + + def isEmpty(self): + return self.firstIndex > self.lastIndex + + def popIfNotEmpty(self): + with self._lock: + if (self.isEmpty()): + return 0 + return self.pop() + + def allocate(self, i): + with self._lock: + # logger.debug("LQ allocating item {}".format(i)) + if ( i in self.inUse ): + raise RuntimeError("Cannot re-use same index in queue: {}".format(i)) + self.inUse.add(i) + + def release(self, i): + with self._lock: + # logger.debug("LQ releasing item {}".format(i)) + self.inUse.remove(i) # KeyError possible, TODO: why? + + def size(self): + return self.lastIndex + 1 - self.firstIndex + + def pickAndAllocate(self): + if ( self.isEmpty() ): + return None + with self._lock: + cnt = 0 # counting the interations + while True: + cnt += 1 + if ( cnt > self.size()*10 ): # 10x iteration already + # raise RuntimeError("Failed to allocate LinearQueue element") + return None + ret = Dice.throwRange(self.firstIndex, self.lastIndex+1) + if ( not ret in self.inUse ): + self.allocate(ret) + return ret + +class DbConn: + def __init__(self): + self._conn = None + self._cursor = None + self.isOpen = False + + def open(self): # Open connection + if ( self.isOpen ): + raise RuntimeError("Cannot re-open an existing DB connection") + + cfgPath = "../../build/test/cfg" + self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable + self._cursor = self._conn.cursor() + + # Get the connection/cursor ready + self._cursor.execute('reset query cache') + # self._cursor.execute('use db') + + # Open connection + self._tdSql = TDSql() + self._tdSql.init(self._cursor) + self.isOpen = True + + def resetDb(self): # reset the whole database, etc. + if ( not self.isOpen ): + raise RuntimeError("Cannot reset database until connection is open") + # self._tdSql.prepare() # Recreate database, etc. + + self._cursor.execute('drop database if exists db') + logger.debug("Resetting DB, dropped database") + # self._cursor.execute('create database db') + # self._cursor.execute('use db') + + # tdSql.execute('show databases') + + def close(self): + if ( not self.isOpen ): + raise RuntimeError("Cannot clean up database until connection is open") + self._tdSql.close() + self.isOpen = False + + def execute(self, sql): + if ( not self.isOpen ): + raise RuntimeError("Cannot execute database commands until connection is open") + return self._tdSql.execute(sql) + + def query(self, sql) -> int : # return number of rows retrieved + if ( not self.isOpen ): + raise RuntimeError("Cannot query database until connection is open") + return self._tdSql.query(sql) + + +# State of the database as we believe it to be +class DbState(): + STATE_INVALID = -1 + STATE_EMPTY = 1 # nothing there, no even a DB + STATE_DB_ONLY = 2 # we have a DB, but nothing else + STATE_TABLE_ONLY = 3 # we have a table, but totally empty + STATE_HAS_DATA = 4 # we have some data in the table + + def __init__(self): + self.tableNumQueue = LinearQueue() + self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick + self._lastInt = 0 # next one is initial integer + self._lock = threading.RLock() + self._state = self.STATE_INVALID + + # self.openDbServerConnection() + self._dbConn = DbConn() + try: + self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected + except taos.error.ProgrammingError as err: + # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err)) + if ( err.msg == 'disconnected' ): # cannot open DB connection + print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") + sys.exit() + else: + raise + except: + print("[=]Unexpected exception") + raise + self._dbConn.resetDb() # drop and recreate DB + self._state = self.STATE_EMPTY # initial state, the result of above + + def getDbConn(self): + return self._dbConn + + def pickAndAllocateTable(self): # pick any table, and "use" it + return self.tableNumQueue.pickAndAllocate() + + def addTable(self): + with self._lock: + tIndex = self.tableNumQueue.push() + return tIndex + + def getFixedTableName(self): + return "fixed_table" + + def releaseTable(self, i): # return the table back, so others can use it + self.tableNumQueue.release(i) + + def getNextTick(self): + with self._lock: # prevent duplicate tick + self._lastTick += datetime.timedelta(0, 1) # add one second to it + return self._lastTick + + def getNextInt(self): + with self._lock: + self._lastInt += 1 + return self._lastInt + + def getTableNameToDelete(self): + tblNum = self.tableNumQueue.pop() # TODO: race condition! + if ( not tblNum ): # maybe false + return False + + return "table_{}".format(tblNum) + + def execSql(self, sql): # using the main DB connection + return self._dbConn.execute(sql) + + def cleanUp(self): + self._dbConn.close() + + def getTasksAtState(self): + tasks = [] + tasks.append(ReadFixedDataTask(self)) # always + if ( self._state == self.STATE_EMPTY ): + tasks.append(CreateDbTask(self)) + tasks.append(CreateFixedTableTask(self)) + elif ( self._state == self.STATE_DB_ONLY ): + tasks.append(DropDbTask(self)) + tasks.append(CreateFixedTableTask(self)) + tasks.append(AddFixedDataTask(self)) + elif ( self._state == self.STATE_TABLE_ONLY ): + tasks.append(DropFixedTableTask(self)) + tasks.append(AddFixedDataTask(self)) + elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust + tasks.append(DropFixedTableTask(self)) + tasks.append(AddFixedDataTask(self)) + else: + raise RuntimeError("Unexpected DbState state: {}".format(self._state)) + return tasks + + def transition(self, tasks): + if ( len(tasks) == 0 ): # before 1st step, or otherwise empty + return # do nothing + if ( self._state == self.STATE_EMPTY ): + # self.assertNoSuccess(tasks, ReadFixedDataTask) # some read may be successful, since we might be creating a table + if ( self.hasSuccess(tasks, CreateDbTask) ): + self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class + self._state = self.STATE_DB_ONLY + if ( self.hasSuccess(tasks, CreateFixedTableTask )): + self._state = self.STATE_TABLE_ONLY + # else: # no successful table creation, not much we can say, as it is step 2 + else: # did not create db + self.assertNoTask(tasks, CreateDbTask) # because we did not have such task + # self.assertNoSuccess(tasks, CreateDbTask) # not necessary, since we just verified no such task + self.assertNoSuccess(tasks, CreateFixedTableTask) + + elif ( self._state == self.STATE_DB_ONLY ): + self.assertAtMostOneSuccess(tasks, DropDbTask) + self.assertIfExistThenSuccess(tasks, DropDbTask) + self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) + # Nothing to be said about adding data task + if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB + # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess + self.assertAtMostOneSuccess(tasks, DropDbTask) + self._state = self.STATE_EMPTY + elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success + # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table + self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful + self.assertNoTask(tasks, DropDbTask) # should have have tried + if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet + # can't say there's add-data attempts, since they may all fail + self._state = self.STATE_TABLE_ONLY + else: + self._state = self.STATE_HAS_DATA + else: # no success in dropping db tasks, no success in create fixed table, not acceptable + raise RuntimeError("Unexpected no-success scenario") + + elif ( self._state == self.STATE_TABLE_ONLY ): + if ( self.hasSuccess(tasks, DropFixedTableTask) ): + self.assertAtMostOneSuccess(tasks, DropFixedTableTask) + self._state = self.STATE_DB_ONLY + elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table + self.assertNoTask(tasks, DropFixedTableTask) + self._state = self.STATE_HAS_DATA + else: # did not drop table, did not insert data, that is impossible + raise RuntimeError("Unexpected no-success scenarios") + + elif ( self._state == self.STATE_HAS_DATA ): # Same as above, TODO: adjust + if ( self.hasSuccess(tasks, DropFixedTableTask) ): + self.assertAtMostOneSuccess(tasks, DropFixedTableTask) + self._state = self.STATE_DB_ONLY + elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table + self.assertNoTask(tasks, DropFixedTableTask) + self._state = self.STATE_HAS_DATA + else: # did not drop table, did not insert data, that is impossible + raise RuntimeError("Unexpected no-success scenarios") + + else: + raise RuntimeError("Unexpected DbState state: {}".format(self._state)) + logger.debug("New DB state is: {}".format(self._state)) + + def assertAtMostOneSuccess(self, tasks, cls): + sCnt = 0 + for task in tasks : + if not isinstance(task, cls): + continue + if task.isSuccess(): + task.logDebug("Task success found") + sCnt += 1 + if ( sCnt >= 2 ): + raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls)) + + def assertIfExistThenSuccess(self, tasks, cls): + sCnt = 0 + exists = False + for task in tasks : + if not isinstance(task, cls): + continue + exists = True # we have a valid instance + if task.isSuccess(): + sCnt += 1 + if ( exists and sCnt <= 0 ): + raise RuntimeError("Unexpected zero success for task: {}".format(cls)) + + def assertNoTask(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + raise RuntimeError("Unexpected task: {}".format(cls)) + + def assertNoSuccess(self, tasks, cls): + for task in tasks : + if isinstance(task, cls): + if task.isSuccess(): + raise RuntimeError("Unexpected successful task: {}".format(cls)) + + def hasSuccess(self, tasks, cls): + for task in tasks : + if not isinstance(task, cls): + continue + if task.isSuccess(): + return True + return False + +class TaskExecutor(): + def __init__(self, curStep): + self._curStep = curStep + + def getCurStep(self): + return self._curStep + + def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread + task.execute(wt) + + # def logInfo(self, msg): + # logger.info(" T[{}.x]: ".format(self._curStep) + msg) + + # def logDebug(self, msg): + # logger.debug(" T[{}.x]: ".format(self._curStep) + msg) + +class Task(): + taskSn = 100 + + @classmethod + def allocTaskNum(cls): + cls.taskSn += 1 + return cls.taskSn + + def __init__(self, dbState: DbState): + self._dbState = dbState + self._workerThread = None + self._err = None + self._curStep = None + self._numRows = None # Number of rows affected + + # Assign an incremental task serial number + self._taskNum = self.allocTaskNum() + + def isSuccess(self): + return self._err == None + + def clone(self): + newTask = self.__class__(self._dbState) + return newTask + + def logDebug(self, msg): + self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + + def logInfo(self, msg): + self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg)) + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__)) + + def execute(self, wt: WorkerThread): + wt.verifyThreadSelf() + self._workerThread = wt # type: ignore + + te = wt.getTaskExecutor() + self._curStep = te.getCurStep() + self.logDebug("[-] executing task {}...".format(self.__class__.__name__)) + + self._err = None + try: + self._executeInternal(te, wt) # TODO: no return value? + except taos.error.ProgrammingError as err: + self.logDebug("[=]Taos Execution exception: {0}".format(err)) + self._err = err + except: + self.logDebug("[=]Unexpected exception") + raise + + self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure")) + + def execSql(self, sql): + return self._dbState.execute(sql) + +class CreateDbTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + wt.execSql("create database db") + +class DropDbTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + wt.execSql("drop database db") + +class CreateTableTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tIndex = self._dbState.addTable() + self.logDebug("Creating a table {} ...".format(tIndex)) + wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) + self.logDebug("Table {} created.".format(tIndex)) + self._dbState.releaseTable(tIndex) + +class CreateFixedTableTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbState.getFixedTableName() + wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName)) + +class ReadFixedDataTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbState.getFixedTableName() + self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later + # tdSql.query(" cars where tbname in ('carzero', 'carone')") + +class DropTableTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tableName = self._dbState.getTableNameToDelete() + if ( not tableName ): # May be "False" + self.logInfo("Cannot generate a table to delete, skipping...") + return + self.logInfo("Dropping a table db.{} ...".format(tableName)) + wt.execSql("drop table db.{}".format(tableName)) + +class DropFixedTableTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbState.getFixedTableName() + wt.execSql("drop table db.{}".format(tblName)) + +class AddDataTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + ds = self._dbState + self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) + tIndex = ds.pickAndAllocateTable() + if ( tIndex == None ): + self.logInfo("No table found to add data, skipping...") + return + sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) + self.logDebug("Executing SQL: {}".format(sql)) + wt.execSql(sql) + ds.releaseTable(tIndex) + self.logDebug("Finished adding data") + +class AddFixedDataTask(Task): + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + ds = self._dbState + sql = "insert into db.table_{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt()) + wt.execSql(sql) + +# Deterministic random number generator +class Dice(): + seeded = False # static, uninitialized + + @classmethod + def seed(cls, s): # static + if (cls.seeded): + raise RuntimeError("Cannot seed the random generator more than once") + cls.verifyRNG() + random.seed(s) + cls.seeded = True # TODO: protect against multi-threading + + @classmethod + def verifyRNG(cls): # Verify that the RNG is determinstic + random.seed(0) + x1 = random.randrange(0, 1000) + x2 = random.randrange(0, 1000) + x3 = random.randrange(0, 1000) + if ( x1 != 864 or x2!=394 or x3!=776 ): + raise RuntimeError("System RNG is not deterministic") + + @classmethod + def throw(cls, stop): # get 0 to stop-1 + return cls.throwRange(0, stop) + + @classmethod + def throwRange(cls, start, stop): # up to stop-1 + if ( not cls.seeded ): + raise RuntimeError("Cannot throw dice before seeding it") + return random.randrange(start, stop) + + +# Anyone needing to carry out work should simply come here +class WorkDispatcher(): + def __init__(self, dbState): + # self.totalNumMethods = 2 + self.tasks = [ + CreateTableTask(dbState), + DropTableTask(dbState), + AddDataTask(dbState), + ] + + def throwDice(self): + max = len(self.tasks) - 1 + dRes = random.randint(0, max) + # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes)) + return dRes + + def pickTask(self): + dice = self.throwDice() + return self.tasks[dice] + + def doWork(self, workerThread): + task = self.pickTask() + task.execute(workerThread) + +def main(): + # Super cool Python argument library: https://docs.python.org/3/library/argparse.html + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description=textwrap.dedent('''\ + TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below) + --------------------------------------------------------------------- + 1. You build TDengine in the top level ./build directory, as described in offical docs + 2. You run the server there before this script: ./build/bin/taosd -c test/cfg + + ''')) + parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + help='Use a single shared db connection (default: false)') + parser.add_argument('-d', '--debug', action='store_true', + help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, + help='Maximum number of steps to run (default: 100)') + parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, + help='Number of threads to run (default: 10)') + + global gConfig + gConfig = parser.parse_args() + if len(sys.argv) == 1: + parser.print_help() + sys.exit() + + global logger + logger = logging.getLogger('myApp') + if ( gConfig.debug ): + logger.setLevel(logging.DEBUG) # default seems to be INFO + ch = logging.StreamHandler() + logger.addHandler(ch) + + dbState = DbState() + Dice.seed(0) # initial seeding of dice + tc = ThreadCoordinator( + ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), + WorkDispatcher(dbState), + dbState + ) + tc.run() + dbState.cleanUp() + logger.info("Finished running thread pool") + +if __name__ == "__main__": + main() diff --git a/tests/pytest/crash_gen_0519.sh b/tests/pytest/crash_gen_0519.sh new file mode 100755 index 0000000000..3e744d7926 --- /dev/null +++ b/tests/pytest/crash_gen_0519.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +# This is the script for us to try to cause the TDengine server or client to crash +# +# PREPARATION +# +# 1. Build an compile the TDengine source code that comes with this script, in the same directory tree +# 2. Please follow the direction in our README.md, and build TDengine in the build/ directory +# 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg +# 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg +# 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above +# 6. Make sure you have the proper Python packages: # sudo apt install python3-setuptools python3-pip python3-distutils +# +# RUNNING THIS SCRIPT +# +# This script assumes the source code directory is intact, and that the binaries has been built in the +# build/ directory, as such, will will load the Python libraries in the directory tree, and also load +# the TDengine client shared library (so) file, in the build/directory, as evidenced in the env +# variables below. +# +# Running the script is simple, no parameter is needed (for now, but will change in the future). +# +# Happy Crashing... + + +# Due to the heavy path name assumptions/usage, let us require that the user be in the current directory +EXEC_DIR=`dirname "$0"` +if [[ $EXEC_DIR != "." ]] +then + echo "ERROR: Please execute `basename "$0"` in its own directory (for now anyway, pardon the dust)" + exit -1 +fi + +# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work. +export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3 + +# Then let us set up the library path so that our compiled SO file can be loaded by Python +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib + +# Now we are all let, and let's see if we can find a crash. Note we pass all params +./crash_gen_0519.py $@ From 5911bf64c33bec340be7323e2c5f503fd9dbba52 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 19 May 2020 01:50:14 +0000 Subject: [PATCH 17/17] fix clean up coredump --- src/tsdb/src/tsdbMeta.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 5516231483..80bfe58f0a 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -154,6 +154,7 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) { STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta)); if (pMeta == NULL) return NULL; + pMeta->maxTables = maxTables; pMeta->nTables = 0; pMeta->superList = NULL; pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *));