From fcbe6c154d98f2d747dabd1e73e1895d4f69bcbf Mon Sep 17 00:00:00 2001 From: Steven Li Date: Sat, 27 Jun 2020 21:10:15 -0700 Subject: [PATCH 01/24] Adjusted crash_gen to examine returned error codes --- tests/pytest/crash_gen.py | 192 +++++++++++++++++++++++++++++--------- 1 file changed, 147 insertions(+), 45 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 916e8904ff..94ad63697c 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -78,13 +78,22 @@ class WorkerThread: if ( gConfig.per_thread_db_connection ): # type: ignore self._dbConn = DbConn() + self._dbInUse = False # if "use db" was executed already + def logDebug(self, msg): logger.debug(" TRD[{}] {}".format(self._tid, msg)) def logInfo(self, msg): logger.info(" TRD[{}] {}".format(self._tid, msg)) - + def dbInUse(self): + return self._dbInUse + + def useDb(self): + if ( not self._dbInUse ): + self.execSql("use db") + self._dbInUse = True + def getTaskExecutor(self): return self._tc.getTaskExecutor() @@ -118,12 +127,17 @@ class WorkerThread: logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break + # Fetch a task from the Thread Coordinator logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid)) task = tc.fetchTask() + + # Execute such a task logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__)) task.execute(self) tc.saveExecutedTask(task) logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid)) + + self._dbInUse = False # there may be changes between steps def verifyThreadSelf(self): # ensure we are called by this own thread if ( threading.get_ident() != self._thread.ident ): @@ -163,6 +177,18 @@ class WorkerThread: else: return self._tc.getDbManager().getDbConn().execute(sql) + def querySql(self, sql): # TODO: expose DbConn directly + if ( gConfig.per_thread_db_connection ): + return self._dbConn.query(sql) + else: + return self._tc.getDbManager().getDbConn().query(sql) + + def getQueryResult(self): + if ( gConfig.per_thread_db_connection ): + return self._dbConn.getQueryResult() + else: + return self._tc.getDbManager().getDbConn().getQueryResult() + def getDbConn(self): if ( gConfig.per_thread_db_connection ): return self._dbConn @@ -176,7 +202,7 @@ class WorkerThread: # return self._tc.getDbState().getDbConn().query(sql) class ThreadCoordinator: - def __init__(self, pool, dbManager): + def __init__(self, pool: ThreadPool, dbManager): self._curStep = -1 # first step is 0 self._pool = pool # self._wd = wd @@ -216,7 +242,16 @@ class ThreadCoordinator: # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" try: - self._dbManager.getStateMachine().transition(self._executedTasks) # at end of step, transiton the DB state + sm = self._dbManager.getStateMachine() + logger.debug("[STT] starting transitions") + sm.transition(self._executedTasks) # at end of step, transiton the DB state + logger.debug("[STT] transition ended") + if sm.hasDatabase() : + for t in self._pool.threadList: + logger.debug("[DB] use db for all worker threads") + t.useDb() + # t.execSql("use db") # main thread executing "use db" on behalf of every worker thread + except taos.error.ProgrammingError as err: if ( err.msg == 'network unavailable' ): # broken DB connection logger.info("DB connection broken, execution failed") @@ -268,7 +303,7 @@ class ThreadCoordinator: wakeSeq.append(i) else: wakeSeq.insert(0, i) - logger.debug("[TRD] Main thread waking up worker thread: {}".format(str(wakeSeq))) + logger.debug("[TRD] Main thread waking up worker threads: {}".format(str(wakeSeq))) # TODO: set dice seed to a deterministic value for i in wakeSeq: self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?! @@ -306,7 +341,7 @@ class ThreadPool: self.maxSteps = maxSteps # Internal class variables self.curStep = 0 - self.threadList = [] + self.threadList = [] # type: List[WorkerThread] # starting to run all the threads, in locking steps def createAndStartThreads(self, tc: ThreadCoordinator): @@ -412,7 +447,7 @@ class DbConn: # Get the connection/cursor ready self._cursor.execute('reset query cache') - # self._cursor.execute('use db') # note we do this in _findCurrenState + # self._cursor.execute('use db') # do this at the beginning of every step # Open connection self._tdSql = TDSql() @@ -450,7 +485,7 @@ class DbConn: raise RuntimeError("Cannot query database until connection is open") logger.debug("[SQL] Executing SQL: {}".format(sql)) nRows = self._tdSql.query(sql) - logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) + logger.debug("[SQL] Query Result, nRows = {}, SQL = {}".format(nRows, sql)) return nRows # results are in: return self._tdSql.queryResult @@ -620,10 +655,10 @@ class StateDbOnly(AnyState): # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess # self.assertAtMostOneSuccess(tasks, DropDbTask) # self._state = self.STATE_EMPTY - if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success - # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - if ( not self.hasTask(tasks, TaskDropSuperTable) ): - self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything + # if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success + # # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table + # if ( not self.hasTask(tasks, TaskDropSuperTable) ): + # self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything # self.assertNoTask(tasks, DropDbTask) # should have have tried # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # # can't say there's add-data attempts, since they may all fail @@ -648,7 +683,9 @@ class StateSuperTableOnly(AnyState): def verifyTasksToState(self, tasks, newState): if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table - self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) + #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) + self.hasSuccess(tasks, TaskCreateSuperTable) # we must have had recreted it + # self._state = self.STATE_DB_ONLY # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases @@ -692,7 +729,7 @@ class StateHasData(AnyState): self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) -class StateMechine : +class StateMechine: def __init__(self, dbConn): self._dbConn = dbConn self._curState = self._findCurrentState() # starting state @@ -701,8 +738,17 @@ class StateMechine : def getCurrentState(self): return self._curState + def hasDatabase(self): + return self._curState.canDropDb() # ha, can drop DB means it has one + # May be slow, use cautionsly... def getTaskTypes(self): # those that can run (directly/indirectly) from the current state + def typesToStrings(types): + ss = [] + for t in types: + ss.append(t.__name__) + return ss + allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks firstTaskTypes = [] for tc in allTaskClasses: @@ -721,7 +767,7 @@ class StateMechine : if len(taskTypes) <= 0: raise RuntimeError("No suitable task types found for state: {}".format(self._curState)) - logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, taskTypes)) + logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, typesToStrings(taskTypes))) return taskTypes def _findCurrentState(self): @@ -731,7 +777,7 @@ class StateMechine : # logger.debug("Found EMPTY state") logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) return StateEmpty() - dbc.execute("use db") # did not do this when openning connection + dbc.execute("use db") # did not do this when openning connection, and this is NOT the worker thread, which does this on their own if dbc.query("show tables") == 0 : # no tables # logger.debug("Found DB ONLY state") logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) @@ -747,6 +793,7 @@ class StateMechine : def transition(self, tasks): if ( len(tasks) == 0 ): # before 1st step, or otherwise empty + logger.debug("[STT] Starting State: {}".format(self._curState)) return # do nothing self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps @@ -830,7 +877,7 @@ class DbManager(): def getDbConn(self): return self._dbConn - def getStateMachine(self): + def getStateMachine(self) -> StateMechine : return self._stateMachine # def getState(self): @@ -931,6 +978,7 @@ class Task(): # logger.debug("Creating new task {}...".format(self._taskNum)) self._execStats = execStats + self._lastSql = "" # last SQL executed/attempted def isSuccess(self): return self._err == None @@ -961,10 +1009,16 @@ class Task(): try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err)) - self._err = err + errno2 = 0x80000000 + err.errno # positive error number + if ( errno2 in [0x200, 0x360, 0x362, 0x381, 0x380, 0x600 ]) : # allowed errors + self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) + print("e", end="", flush=True) + self._err = err + else: + self.logDebug("[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) + raise except: - self.logDebug("[=] Unexpected exception") + self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql)) raise self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) @@ -972,8 +1026,21 @@ class Task(): self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above. def execSql(self, sql): + self._lastSql = sql return self._dbManager.execute(sql) + def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread + self._lastSql = sql + return wt.execSql(sql) + + def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread + self._lastSql = sql + return wt.querySql(sql) + + def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread + return wt.getQueryResult() + + class ExecutionStats: def __init__(self): @@ -1039,6 +1106,11 @@ class ExecutionStats: class StateTransitionTask(Task): + LARGE_NUMBER_OF_TABLES = 35 + SMALL_NUMBER_OF_TABLES = 3 + LARGE_NUMBER_OF_RECORDS = 50 + SMALL_NUMBER_OF_RECORDS = 3 + @classmethod def getInfo(cls): # each sub class should supply their own information raise RuntimeError("Overriding method expected") @@ -1061,6 +1133,10 @@ class StateTransitionTask(Task): # return state.getValue() in cls.getBeginStates() raise RuntimeError("must be overriden") + @classmethod + def getRegTableName(cls, i): + return "db.reg_table_{}".format(i) + def execute(self, wt: WorkerThread): super().execute(wt) @@ -1074,7 +1150,7 @@ class TaskCreateDb(StateTransitionTask): return state.canCreateDb() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - wt.execSql("create database db") + self.execWtSql(wt, "create database db") class TaskDropDb(StateTransitionTask): @classmethod @@ -1086,7 +1162,7 @@ class TaskDropDb(StateTransitionTask): return state.canDropDb() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - wt.execSql("drop database db") + self.execWtSql(wt, "drop database db") logger.debug("[OPS] database dropped at {}".format(time.time())) class TaskCreateSuperTable(StateTransitionTask): @@ -1099,8 +1175,13 @@ class TaskCreateSuperTable(StateTransitionTask): return state.canCreateFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbManager.getFixedSuperTableName() - wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) + if not wt.dbInUse(): # no DB yet, to the best of our knowledge + logger.debug("Skipping task, no DB yet") + return + + tblName = self._dbManager.getFixedSuperTableName() + # wt.execSql("use db") # should always be in place + self.execWtSql(wt, "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that automatically @@ -1115,16 +1196,16 @@ class TaskReadData(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): sTbName = self._dbManager.getFixedSuperTableName() - dbc = wt.getDbConn() - dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later + self.queryWtSql(wt, "select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later + if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations - dbc.close() - dbc.open() + wt.getDbConn().close() + wt.getDbConn().open() else: - rTables = dbc.getQueryResult() + rTables = self.getQueryResult(wt) # wt.getDbConn().getQueryResult() # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) for rTbName in rTables : # regular tables - dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + self.execWtSql(wt, "select * from db.{}".format(rTbName[0])) # tdSql.query(" cars where tbname in ('carzero', 'carone')") @@ -1138,8 +1219,31 @@ class TaskDropSuperTable(StateTransitionTask): return state.canDropFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbManager.getFixedSuperTableName() - wt.execSql("drop table db.{}".format(tblName)) + # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence + if Dice.throw(2) == 0 : + tblSeq = list(range(2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))) + random.shuffle(tblSeq) + tickOutput = False # if we have spitted out a "d" character for "drop regular table" + for i in tblSeq: + regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i) + try: + nRows = self.execWtSql(wt, "drop table {}".format(regTableName)) + except taos.error.ProgrammingError as err: + errno2 = 0x80000000 + err.errno # positive error number + if ( errno2 in [0x362]) : # allowed errors + logger.debug("[DB] Acceptable error when dropping a table") + continue + + if (not tickOutput): + tickOutput = True # Print only one time + if nRows >= 1 : + print("d", end="", flush=True) + else: + print("f({})".format(nRows), end="", flush=True) + + # Drop the super table itself + tblName = self._dbManager.getFixedSuperTableName() + self.execWtSql(wt, "drop table db.{}".format(tblName)) class TaskAlterTags(StateTransitionTask): @classmethod @@ -1154,20 +1258,18 @@ class TaskAlterTags(StateTransitionTask): tblName = self._dbManager.getFixedSuperTableName() dice = Dice.throw(4) if dice == 0 : - wt.execSql("alter table db.{} add tag extraTag int".format(tblName)) + sql = "alter table db.{} add tag extraTag int".format(tblName) elif dice == 1 : - wt.execSql("alter table db.{} drop tag extraTag".format(tblName)) + sql = "alter table db.{} drop tag extraTag".format(tblName) elif dice == 2 : - wt.execSql("alter table db.{} drop tag newTag".format(tblName)) + sql = "alter table db.{} drop tag newTag".format(tblName) else: # dice == 3 - wt.execSql("alter table db.{} change tag extraTag newTag".format(tblName)) + sql = "alter table db.{} change tag extraTag newTag".format(tblName) + + self.execWtSql(wt, sql) class TaskAddData(StateTransitionTask): activeTable : Set[int] = set() # Track which table is being actively worked on - LARGE_NUMBER_OF_TABLES = 35 - SMALL_NUMBER_OF_TABLES = 3 - LARGE_NUMBER_OF_RECORDS = 50 - SMALL_NUMBER_OF_RECORDS = 3 # We use these two files to record operations to DB, useful for power-off tests fAddLogReady = None @@ -1193,7 +1295,7 @@ class TaskAddData(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbManager - wt.execSql("use db") # TODO: seems to be an INSERT bug to require this + # wt.execSql("use db") # TODO: seems to be an INSERT bug to require this tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) for i in tblSeq: @@ -1203,10 +1305,10 @@ class TaskAddData(StateTransitionTask): print("x", end="", flush=True) else: self.activeTable.add(i) # marking it active - # No need to shuffle data sequence, unless later we decide to do non-increment insertion + # No need to shuffle data sequence, unless later we decide to do non-increment insertion + regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i) for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table - nextInt = ds.getNextInt() - regTableName = "db.reg_table_{}".format(i) + nextInt = ds.getNextInt() if gConfig.record_ops: self.prepToRecordOps() self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) @@ -1217,7 +1319,7 @@ class TaskAddData(StateTransitionTask): ds.getFixedSuperTableName(), ds.getNextBinary(), ds.getNextFloat(), ds.getNextTick(), nextInt) - wt.execSql(sql) + self.execWtSql(wt, sql) if gConfig.record_ops: self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.flush() @@ -1390,7 +1492,7 @@ def main(): # if len(sys.argv) == 1: # parser.print_help() # sys.exit() - + global logger logger = logging.getLogger('CrashGen') logger.addFilter(LoggingFilter()) From b251dadab4e2225900f1d576e710eca380782d0f Mon Sep 17 00:00:00 2001 From: Steven Li Date: Sat, 27 Jun 2020 21:23:53 -0700 Subject: [PATCH 02/24] Revert "revert crash_gen.py" This reverts commit abb55ed0744dfe7a8cb7575af07b756e463bfa74. --- tests/pytest/crash_gen.py | 137 +++++++++++++++++++++----------------- 1 file changed, 75 insertions(+), 62 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index cc41fd5e7d..916e8904ff 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -686,7 +686,8 @@ class StateHasData(AnyState): self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) else: # should be STATE_HAS_DATA - self.assertNoTask(tasks, TaskDropDb) + if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one + self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) @@ -1295,7 +1296,67 @@ class LoggingFilter(logging.Filter): # return False return True +class MainExec: + @classmethod + def runClient(cls): + # resetDb = False # DEBUG only + # dbState = DbState(resetDb) # DBEUG only! + dbManager = DbManager() # Regular function + Dice.seed(0) # initial seeding of dice + thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) + tc = ThreadCoordinator(thPool, dbManager) + tc.run() + tc.logStats() + dbManager.cleanUp() + + @classmethod + def runService(cls): + print("Running service...") + + @classmethod + def runTemp(cls): # for debugging purposes + # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix + # dbc = dbState.getDbConn() + # sTbName = dbState.getFixedSuperTableName() + # dbc.execute("create database if not exists db") + # if not dbState.getState().equals(StateEmpty()): + # dbc.execute("use db") + + # rTables = None + # try: # the super table may not exist + # sql = "select TBNAME from db.{}".format(sTbName) + # logger.info("Finding out tables in super table: {}".format(sql)) + # dbc.query(sql) # TODO: analyze result set later + # logger.info("Fetching result") + # rTables = dbc.getQueryResult() + # logger.info("Result: {}".format(rTables)) + # except taos.error.ProgrammingError as err: + # logger.info("Initial Super table OPS error: {}".format(err)) + + # # sys.exit() + # if ( not rTables == None): + # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + # try: + # for rTbName in rTables : # regular tables + # ds = dbState + # logger.info("Inserting into table: {}".format(rTbName[0])) + # sql = "insert into db.{} values ('{}', {});".format( + # rTbName[0], + # ds.getNextTick(), ds.getNextInt()) + # dbc.execute(sql) + # for rTbName in rTables : # regular tables + # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + # logger.info("Initial READING operation is successful") + # except taos.error.ProgrammingError as err: + # logger.info("Initial WRITE/READ error: {}".format(err)) + + # Sandbox testing code + # dbc = dbState.getDbConn() + # while True: + # rows = dbc.query("show databases") + # print("Rows: {}, time={}".format(rows, time.time())) + return def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html @@ -1308,24 +1369,27 @@ def main(): 2. You run the server there before this script: ./build/bin/taosd -c test/cfg ''')) + parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument('-e', '--run-tdengine', action='store_true', + help='Run TDengine service in foreground (default: false)') parser.add_argument('-l', '--larger-data', action='store_true', help='Write larger amount of data during write operations (default: false)') - parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + parser.add_argument('-p', '--per-thread-db-connection', action='store_false', help='Use a single shared db connection (default: false)') parser.add_argument('-r', '--record-ops', action='store_true', help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') - parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, + parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, help='Maximum number of steps to run (default: 100)') - parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, + parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, help='Number of threads to run (default: 10)') global gConfig gConfig = parser.parse_args() - if len(sys.argv) == 1: - parser.print_help() - sys.exit() + # if len(sys.argv) == 1: + # parser.print_help() + # sys.exit() global logger logger = logging.getLogger('CrashGen') @@ -1337,62 +1401,11 @@ def main(): ch = logging.StreamHandler() logger.addHandler(ch) - # resetDb = False # DEBUG only - # dbState = DbState(resetDb) # DBEUG only! - dbManager = DbManager() # Regular function - Dice.seed(0) # initial seeding of dice - tc = ThreadCoordinator( - ThreadPool(gConfig.num_threads, gConfig.max_steps), - # WorkDispatcher(dbState), # Obsolete? - dbManager - ) + if gConfig.run_tdengine : # run server + MainExec.runService() + else : + MainExec.runClient() - # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix - # dbc = dbState.getDbConn() - # sTbName = dbState.getFixedSuperTableName() - # dbc.execute("create database if not exists db") - # if not dbState.getState().equals(StateEmpty()): - # dbc.execute("use db") - - # rTables = None - # try: # the super table may not exist - # sql = "select TBNAME from db.{}".format(sTbName) - # logger.info("Finding out tables in super table: {}".format(sql)) - # dbc.query(sql) # TODO: analyze result set later - # logger.info("Fetching result") - # rTables = dbc.getQueryResult() - # logger.info("Result: {}".format(rTables)) - # except taos.error.ProgrammingError as err: - # logger.info("Initial Super table OPS error: {}".format(err)) - - # # sys.exit() - # if ( not rTables == None): - # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - # try: - # for rTbName in rTables : # regular tables - # ds = dbState - # logger.info("Inserting into table: {}".format(rTbName[0])) - # sql = "insert into db.{} values ('{}', {});".format( - # rTbName[0], - # ds.getNextTick(), ds.getNextInt()) - # dbc.execute(sql) - # for rTbName in rTables : # regular tables - # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure - # logger.info("Initial READING operation is successful") - # except taos.error.ProgrammingError as err: - # logger.info("Initial WRITE/READ error: {}".format(err)) - - - - # Sandbox testing code - # dbc = dbState.getDbConn() - # while True: - # rows = dbc.query("show databases") - # print("Rows: {}, time={}".format(rows, time.time())) - - tc.run() - tc.logStats() - dbManager.cleanUp() # logger.info("Crash_Gen execution finished") From 6cb97b3730cf5af306e12b620fbf54a6d0f913fc Mon Sep 17 00:00:00 2001 From: Steven Li Date: Sun, 28 Jun 2020 21:22:21 -0700 Subject: [PATCH 03/24] Discovered Python client problem, ready to file JIRA --- tests/pytest/crash_gen.py | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 94ad63697c..4f3f3a98e8 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -270,7 +270,7 @@ class ThreadCoordinator: # Get ready for next step logger.debug("<-- Step {} finished".format(self._curStep)) self._curStep += 1 # we are about to get into next step. TODO: race condition here! - logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep + logger.debug("\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep # A new TE for the new step if not failed: # only if not failed @@ -452,6 +452,7 @@ class DbConn: # Open connection self._tdSql = TDSql() self._tdSql.init(self._cursor) + logger.debug("[DB] data connection opened") self.isOpen = True def resetDb(self): # reset the whole database, etc. @@ -470,6 +471,7 @@ class DbConn: if ( not self.isOpen ): raise RuntimeError("Cannot clean up database until connection is open") self._tdSql.close() + logger.debug("[DB] Database connection closed") self.isOpen = False def execute(self, sql): @@ -1010,13 +1012,19 @@ class Task(): self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: errno2 = 0x80000000 + err.errno # positive error number - if ( errno2 in [0x200, 0x360, 0x362, 0x381, 0x380, 0x600 ]) : # allowed errors + if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x381, 0x380, 0x383, 0x600 ]) : # allowed errors self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) print("e", end="", flush=True) self._err = err else: - self.logDebug("[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) - raise + errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql) + self.logDebug(errMsg) + if gConfig.debug : + raise # so that we see full stack + else: # non-debug + print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n" + + "--------------\n".format(errMsg)) + sys.exit(-1) except: self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql)) raise @@ -1388,7 +1396,8 @@ class LoggingFilter(logging.Filter): if ( record.levelno >= logging.INFO ) : return True # info or above always log - msg = record.msg + + # print("type = {}, value={}".format(type(msg), msg)) # sys.exit() @@ -1398,6 +1407,11 @@ class LoggingFilter(logging.Filter): # return False return True +class MyLoggingAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs + # return '[%s] %s' % (self.extra['connid'], msg), kwargs + class MainExec: @classmethod def runClient(cls): @@ -1493,16 +1507,21 @@ def main(): # parser.print_help() # sys.exit() + # Logging Stuff global logger - logger = logging.getLogger('CrashGen') - logger.addFilter(LoggingFilter()) + _logger = logging.getLogger('CrashGen') # real logger + _logger.addFilter(LoggingFilter()) + ch = logging.StreamHandler() + _logger.addHandler(ch) + + logger = MyLoggingAdapter(_logger, []) # Logging adapter, to be used as a logger + if ( gConfig.debug ): logger.setLevel(logging.DEBUG) # default seems to be INFO else: logger.setLevel(logging.INFO) - ch = logging.StreamHandler() - logger.addHandler(ch) - + + # Run server or client if gConfig.run_tdengine : # run server MainExec.runService() else : From 496932f0cc0987242141df4694479a3866d4d596 Mon Sep 17 00:00:00 2001 From: liu0x54 Date: Tue, 30 Jun 2020 01:32:38 +0000 Subject: [PATCH 04/24] [TD-785]check the caller and creater, if not match, return none --- .../python/linux/python3/taos/cursor.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/connector/python/linux/python3/taos/cursor.py b/src/connector/python/linux/python3/taos/cursor.py index 06d6a19462..3bd353e3c5 100644 --- a/src/connector/python/linux/python3/taos/cursor.py +++ b/src/connector/python/linux/python3/taos/cursor.py @@ -1,6 +1,7 @@ from .cinterface import CTaosInterface from .error import * from .constants import FieldType +import threading # querySeqNum = 0 @@ -37,6 +38,7 @@ class TDengineCursor(object): self._block_iter = 0 self._affected_rows = 0 self._logfile = "" + self._threadId = threading.get_ident() if connection is not None: self._connection = connection @@ -103,6 +105,11 @@ class TDengineCursor(object): def execute(self, operation, params=None): """Prepare and execute a database operation (query or command). """ + if threading.get_ident() != self._threadId: + info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) + print(info) + return None + if not operation: return None @@ -188,6 +195,10 @@ class TDengineCursor(object): def fetchall(self): """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation. """ + if threading.get_ident() != self._threadId: + info ="Cursor fetchall:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) + print(info) + return None if self._result is None or self._fields is None: raise OperationalError("Invalid use of fetchall") @@ -232,6 +243,11 @@ class TDengineCursor(object): def _handle_result(self): """Handle the return result from query. """ + if threading.get_ident() != self._threadId: + info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) + print(info) + return None + self._description = [] for ele in self._fields: self._description.append( From 30f5a202f9ec4c6e6e10505c39d3fb8fc3a18747 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 29 Jun 2020 20:05:31 -0700 Subject: [PATCH 05/24] Corrected connection-shared-by-thread flag in Crash_Gen --- .../python/linux/python3/taos/cursor.py | 27 ++++++++++--------- tests/pytest/crash_gen.py | 5 +++- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/connector/python/linux/python3/taos/cursor.py b/src/connector/python/linux/python3/taos/cursor.py index 3bd353e3c5..3f0f315d33 100644 --- a/src/connector/python/linux/python3/taos/cursor.py +++ b/src/connector/python/linux/python3/taos/cursor.py @@ -105,10 +105,11 @@ class TDengineCursor(object): def execute(self, operation, params=None): """Prepare and execute a database operation (query or command). """ - if threading.get_ident() != self._threadId: - info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) - print(info) - return None + # if threading.get_ident() != self._threadId: + # info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) + # raise OperationalError(info) + # print(info) + # return None if not operation: return None @@ -195,10 +196,11 @@ class TDengineCursor(object): def fetchall(self): """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation. """ - if threading.get_ident() != self._threadId: - info ="Cursor fetchall:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) - print(info) - return None + # if threading.get_ident() != self._threadId: + # info ="[WARNING] Cursor fetchall:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) + # raise OperationalError(info) + # print(info) + # return None if self._result is None or self._fields is None: raise OperationalError("Invalid use of fetchall") @@ -243,10 +245,11 @@ class TDengineCursor(object): def _handle_result(self): """Handle the return result from query. """ - if threading.get_ident() != self._threadId: - info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) - print(info) - return None + # if threading.get_ident() != self._threadId: + # info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) + # raise OperationalError(info) + # print(info) + # return None self._description = [] for ele in self._fields: diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 4f3f3a98e8..ccd99ca6cf 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -106,6 +106,7 @@ class WorkerThread: logger.info("Starting to run thread: {}".format(self._tid)) if ( gConfig.per_thread_db_connection ): # type: ignore + logger.debug("Worker thread openning database connection") self._dbConn.open() self._doTaskLoop() @@ -246,6 +247,7 @@ class ThreadCoordinator: logger.debug("[STT] starting transitions") sm.transition(self._executedTasks) # at end of step, transiton the DB state logger.debug("[STT] transition ended") + # Due to limitation (or maybe not) of the Python library, we cannot share connections across threads if sm.hasDatabase() : for t in self._pool.threadList: logger.debug("[DB] use db for all worker threads") @@ -1492,7 +1494,7 @@ def main(): help='Run TDengine service in foreground (default: false)') parser.add_argument('-l', '--larger-data', action='store_true', help='Write larger amount of data during write operations (default: false)') - parser.add_argument('-p', '--per-thread-db-connection', action='store_false', + parser.add_argument('-p', '--per-thread-db-connection', action='store_true', help='Use a single shared db connection (default: false)') parser.add_argument('-r', '--record-ops', action='store_true', help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') @@ -1503,6 +1505,7 @@ def main(): global gConfig gConfig = parser.parse_args() + # if len(sys.argv) == 1: # parser.print_help() # sys.exit() From 4b4c0905578d64995742aceb194f0a5c113ff93c Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 29 Jun 2020 23:02:05 -0700 Subject: [PATCH 06/24] Tweaked abort message in Crash_Gen tool --- tests/pytest/crash_gen.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index ccd99ca6cf..4457fd1493 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -1024,8 +1024,8 @@ class Task(): if gConfig.debug : raise # so that we see full stack else: # non-debug - print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n" + - "--------------\n".format(errMsg)) + print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) + + "----------------------------\n") sys.exit(-1) except: self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql)) From d796aaaa0191f2c9300fd80a2cdd66776f3c4962 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 30 Jun 2020 21:39:53 -0700 Subject: [PATCH 07/24] Corrected TAOS client error detection --- tests/pytest/crash_gen.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 4457fd1493..21de44ef98 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -292,7 +292,7 @@ class ThreadCoordinator: logger.debug("Main thread joining all threads") self._pool.joinAll() # Get all threads to finish - logger.info("All worker thread finished") + logger.info("\nAll worker threads finished") self._execStats.endExec() def logStats(self): @@ -721,10 +721,10 @@ class StateHasData(AnyState): self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task self.hasSuccess(tasks, TaskDropSuperTable) # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy - elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted - self.assertNoTask(tasks, TaskDropDb) - self.assertNoTask(tasks, TaskDropSuperTable) - self.assertNoTask(tasks, TaskAddData) + # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted + # self.assertNoTask(tasks, TaskDropDb) + # self.assertNoTask(tasks, TaskDropSuperTable) + # self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) else: # should be STATE_HAS_DATA if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one @@ -1014,9 +1014,9 @@ class Task(): self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: errno2 = 0x80000000 + err.errno # positive error number - if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x381, 0x380, 0x383, 0x600 ]) : # allowed errors + if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600 ]) : # allowed errors self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) - print("e", end="", flush=True) + print("_", end="", flush=True) self._err = err else: errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql) @@ -1234,22 +1234,24 @@ class TaskDropSuperTable(StateTransitionTask): tblSeq = list(range(2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))) random.shuffle(tblSeq) tickOutput = False # if we have spitted out a "d" character for "drop regular table" + isSuccess = True for i in tblSeq: - regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i) + regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i) try: - nRows = self.execWtSql(wt, "drop table {}".format(regTableName)) + self.execWtSql(wt, "drop table {}".format(regTableName)) # nRows always 0, like MySQL except taos.error.ProgrammingError as err: errno2 = 0x80000000 + err.errno # positive error number - if ( errno2 in [0x362]) : # allowed errors + if ( errno2 in [0x362]) : # mnode invalid table name + isSuccess = False logger.debug("[DB] Acceptable error when dropping a table") - continue + continue # try to delete next regular table if (not tickOutput): tickOutput = True # Print only one time - if nRows >= 1 : + if isSuccess : print("d", end="", flush=True) else: - print("f({})".format(nRows), end="", flush=True) + print("f", end="", flush=True) # Drop the super table itself tblName = self._dbManager.getFixedSuperTableName() From 88ac7bb6df9068b1f3f66f9ab42932b0e28a0c75 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Wed, 1 Jul 2020 22:33:52 -0700 Subject: [PATCH 08/24] Added -c option to Crash_Gen, enabling REST, but encountered error --- tests/pytest/crash_gen.py | 202 +++++++++++++++++++++++++++++--------- 1 file changed, 158 insertions(+), 44 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 21de44ef98..bd2038ca3a 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -30,6 +30,8 @@ import time import logging import datetime import textwrap +import requests +from requests.auth import HTTPBasicAuth from typing import List from typing import Dict @@ -76,7 +78,8 @@ class WorkerThread: # Let us have a DB connection of our own if ( gConfig.per_thread_db_connection ): # type: ignore - self._dbConn = DbConn() + # print("connector_type = {}".format(gConfig.connector_type)) + self._dbConn = DbConn.createNative() if (gConfig.connector_type == 'native') else DbConn.createRest() self._dbInUse = False # if "use db" was executed already @@ -434,15 +437,151 @@ class LinearQueue(): return ret class DbConn: + TYPE_NATIVE = "native-c" + TYPE_REST = "rest-api" + TYPE_INVALID = "invalid" + + @classmethod + def create(cls, connType): + if connType == cls.TYPE_NATIVE: + return DbConnNative() + elif connType == cls.TYPE_REST: + return DbConnRest() + else: + raise RuntimeError("Unexpected connection type: {}".format(connType)) + + @classmethod + def createNative(cls): + return cls.create(cls.TYPE_NATIVE) + + @classmethod + def createRest(cls): + return cls.create(cls.TYPE_REST) + def __init__(self): - self._conn = None - self._cursor = None self.isOpen = False - - def open(self): # Open connection + self._type = self.TYPE_INVALID + + def open(self): if ( self.isOpen ): raise RuntimeError("Cannot re-open an existing DB connection") + # below implemented by child classes + self.openByType() + + logger.debug("[DB] data connection opened, type = {}".format(self._type)) + self.isOpen = True + + def resetDb(self): # reset the whole database, etc. + if ( not self.isOpen ): + raise RuntimeError("Cannot reset database until connection is open") + # self._tdSql.prepare() # Recreate database, etc. + + self.execute('drop database if exists db') + logger.debug("Resetting DB, dropped database") + # self._cursor.execute('create database db') + # self._cursor.execute('use db') + # tdSql.execute('show databases') + + def queryScalar(self, sql) -> int : + return self._queryAny(sql) + + def queryString(self, sql) -> str : + return self._queryAny(sql) + + def _queryAny(self, sql) : # actual query result as an int + if ( not self.isOpen ): + raise RuntimeError("Cannot query database until connection is open") + nRows = self.query(sql) + if nRows != 1 : + raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) + if self.getResultRows() != 1 or self.getResultCols() != 1: + raise RuntimeError("Unexpected result set for query: {}".format(sql)) + return self.getQueryResult()[0][0] + + def execute(self, sql): + raise RuntimeError("Unexpected execution, should be overriden") + def openByType(self): + raise RuntimeError("Unexpected execution, should be overriden") + def getQueryResult(self): + raise RuntimeError("Unexpected execution, should be overriden") + def getResultRows(self): + raise RuntimeError("Unexpected execution, should be overriden") + def getResultCols(self): + raise RuntimeError("Unexpected execution, should be overriden") + +# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql +class DbConnRest(DbConn): + def __init__(self): + super().__init__() + self._type = self.TYPE_REST + self._url = "http://localhost:6020/rest/sql" # fixed for now + self._result = None + + def openByType(self): # Open connection + pass # do nothing, always open + + def close(self): + if ( not self.isOpen ): + raise RuntimeError("Cannot clean up database until connection is open") + # Do nothing for REST + logger.debug("[DB] REST Database connection closed") + self.isOpen = False + + def _doSql(self, sql): + r = requests.post(self._url, + data = sql, + auth = HTTPBasicAuth('root', 'taosdata')) + rj = r.json() + # Sanity check for the "Json Result" + if (not 'status' in rj): + raise RuntimeError("No status in REST response") + + if rj['status'] == 'error': # clearly reported error + if (not 'code' in rj): # error without code + raise RuntimeError("REST error return without code") + errno = rj['code'] # May need to massage this in the future + # print("Raising programming error with REST return: {}".format(rj)) + raise taos.error.ProgrammingError(rj['desc'], errno) # todo: check existance of 'desc' + + if rj['status'] != 'succ': # better be this + raise RuntimeError("Unexpected REST return status: {}".format(rj['status'])) + + nRows = rj['rows'] if ('rows' in rj) else 0 + self._result = rj + return nRows + + def execute(self, sql): + if ( not self.isOpen ): + raise RuntimeError("Cannot execute database commands until connection is open") + logger.debug("[SQL-REST] Executing SQL: {}".format(sql)) + nRows = self._doSql(sql) + logger.debug("[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql)) + return nRows + + def query(self, sql) : # return rows affected + return self.execute(sql) + + def getQueryResult(self): + return self._result['data'] + + def getResultRows(self): + print(self._result) + raise RuntimeError("TBD") + # return self._tdSql.queryRows + + def getResultCols(self): + print(self._result) + raise RuntimeError("TBD") + +class DbConnNative(DbConn): + def __init__(self): + super().__init__() + self._type = self.TYPE_REST + self._conn = None + self._cursor = None + + def openByType(self): # Open connection cfgPath = "../../build/test/cfg" self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable self._cursor = self._conn.cursor() @@ -454,21 +593,7 @@ class DbConn: # Open connection self._tdSql = TDSql() self._tdSql.init(self._cursor) - logger.debug("[DB] data connection opened") - self.isOpen = True - - def resetDb(self): # reset the whole database, etc. - if ( not self.isOpen ): - raise RuntimeError("Cannot reset database until connection is open") - # self._tdSql.prepare() # Recreate database, etc. - - self._cursor.execute('drop database if exists db') - logger.debug("Resetting DB, dropped database") - # self._cursor.execute('create database db') - # self._cursor.execute('use db') - - # tdSql.execute('show databases') - + def close(self): if ( not self.isOpen ): raise RuntimeError("Cannot clean up database until connection is open") @@ -496,22 +621,12 @@ class DbConn: def getQueryResult(self): return self._tdSql.queryResult - def _queryAny(self, sql) : # actual query result as an int - if ( not self.isOpen ): - raise RuntimeError("Cannot query database until connection is open") - tSql = self._tdSql - nRows = tSql.query(sql) - if nRows != 1 : - raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows)) - if tSql.queryRows != 1 or tSql.queryCols != 1: - raise RuntimeError("Unexpected result set for query: {}".format(sql)) - return tSql.queryResult[0][0] + def getResultRows(self): + return self._tdSql.queryRows - def queryScalar(self, sql) -> int : - return self._queryAny(sql) + def getResultCols(self): + return self._tdSql.queryCols - def queryString(self, sql) -> str : - return self._queryAny(sql) class AnyState: STATE_INVALID = -1 @@ -859,7 +974,7 @@ class DbManager(): self._lock = threading.RLock() # self.openDbServerConnection() - self._dbConn = DbConn() + self._dbConn = DbConn.createNative() if (gConfig.connector_type=='native') else DbConn.createRest() try: self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected except taos.error.ProgrammingError as err: @@ -1013,8 +1128,10 @@ class Task(): try: self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: - errno2 = 0x80000000 + err.errno # positive error number - if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600 ]) : # allowed errors + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme + if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600, + 1000 # REST catch-all error + ]) : # allowed errors self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) print("_", end="", flush=True) self._err = err @@ -1239,8 +1356,8 @@ class TaskDropSuperTable(StateTransitionTask): regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i) try: self.execWtSql(wt, "drop table {}".format(regTableName)) # nRows always 0, like MySQL - except taos.error.ProgrammingError as err: - errno2 = 0x80000000 + err.errno # positive error number + except taos.error.ProgrammingError as err: + errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correcting for strange error number scheme if ( errno2 in [0x362]) : # mnode invalid table name isSuccess = False logger.debug("[DB] Acceptable error when dropping a table") @@ -1400,11 +1517,6 @@ class LoggingFilter(logging.Filter): if ( record.levelno >= logging.INFO ) : return True # info or above always log - - - # print("type = {}, value={}".format(type(msg), msg)) - # sys.exit() - # Commenting out below to adjust... # if msg.startswith("[TRD]"): @@ -1490,6 +1602,8 @@ def main(): ''')) + parser.add_argument('-c', '--connector-type', action='store', default='native', type=str, + help='Connector type to use: native, rest, or mixed (default: 10)') parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') parser.add_argument('-e', '--run-tdengine', action='store_true', From a519389d38e0a0f69ecf5c3e910fbd43bc84f1cd Mon Sep 17 00:00:00 2001 From: Steven Li Date: Fri, 3 Jul 2020 00:28:26 -0700 Subject: [PATCH 09/24] Now supporting CTRL-C abort in Crash_Gen, plus tracking top numbers --- tests/pytest/crash_gen.py | 294 ++++++++++++++++++++++++++++++++------ 1 file changed, 251 insertions(+), 43 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index bd2038ca3a..9d3c66b97f 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -15,6 +15,8 @@ from __future__ import annotations # For type hinting before definition, ref: h import sys import os +import io +import signal import traceback # Require Python 3 if sys.version_info[0] < 3: @@ -36,6 +38,8 @@ from requests.auth import HTTPBasicAuth from typing import List from typing import Dict from typing import Set +from typing import IO +from queue import Queue, Empty from util.log import * from util.dnodes import * @@ -205,6 +209,7 @@ class WorkerThread: # else: # return self._tc.getDbState().getDbConn().query(sql) +# The coordinator of all worker threads, mostly running in main thread class ThreadCoordinator: def __init__(self, pool: ThreadPool, dbManager): self._curStep = -1 # first step is 0 @@ -217,6 +222,7 @@ class ThreadCoordinator: self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads self._execStats = ExecutionStats() + self._runStatus = MainExec.STATUS_RUNNING def getTaskExecutor(self): return self._te @@ -227,6 +233,10 @@ class ThreadCoordinator: def crossStepBarrier(self): self._stepBarrier.wait() + def requestToStop(self): + self._runStatus = MainExec.STATUS_STOPPING + self._execStats.registerFailure("User Interruption") + def run(self): self._pool.createAndStartThreads(self) @@ -234,41 +244,56 @@ class ThreadCoordinator: self._curStep = -1 # not started yet maxSteps = gConfig.max_steps # type: ignore self._execStats.startExec() # start the stop watch - failed = False - while(self._curStep < maxSteps-1 and not failed): # maxStep==10, last curStep should be 9 + transitionFailed = False + hasAbortedTask = False + while(self._curStep < maxSteps-1 and + (not transitionFailed) and + (self._runStatus==MainExec.STATUS_RUNNING) and + (not hasAbortedTask)): # maxStep==10, last curStep should be 9 + if not gConfig.debug: print(".", end="", flush=True) # print this only if we are not in debug mode logger.debug("[TRD] Main thread going to sleep") - # Now ready to enter a step + # Now main thread (that's us) is ready to enter a step self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate self._stepBarrier.reset() # Other worker threads should now be at the "gate" # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" - try: - sm = self._dbManager.getStateMachine() - logger.debug("[STT] starting transitions") - sm.transition(self._executedTasks) # at end of step, transiton the DB state - logger.debug("[STT] transition ended") - # Due to limitation (or maybe not) of the Python library, we cannot share connections across threads - if sm.hasDatabase() : - for t in self._pool.threadList: - logger.debug("[DB] use db for all worker threads") - t.useDb() - # t.execSql("use db") # main thread executing "use db" on behalf of every worker thread + # We use this period to do house keeping work, when all worker threads are QUIET. + hasAbortedTask = False + for task in self._executedTasks : + if task.isAborted() : + print("Task aborted: {}".format(task)) + hasAbortedTask = True + break - except taos.error.ProgrammingError as err: - if ( err.msg == 'network unavailable' ): # broken DB connection - logger.info("DB connection broken, execution failed") - traceback.print_stack() - failed = True - self._te = None # Not running any more - self._execStats.registerFailure("Broken DB Connection") - # continue # don't do that, need to tap all threads at end, and maybe signal them to stop - else: - raise - finally: - pass + if hasAbortedTask : # do transition only if tasks are error free + self._execStats.registerFailure("Aborted Task Encountered") + else: + try: + sm = self._dbManager.getStateMachine() + logger.debug("[STT] starting transitions") + sm.transition(self._executedTasks) # at end of step, transiton the DB state + logger.debug("[STT] transition ended") + # Due to limitation (or maybe not) of the Python library, we cannot share connections across threads + if sm.hasDatabase() : + for t in self._pool.threadList: + logger.debug("[DB] use db for all worker threads") + t.useDb() + # t.execSql("use db") # main thread executing "use db" on behalf of every worker thread + except taos.error.ProgrammingError as err: + if ( err.msg == 'network unavailable' ): # broken DB connection + logger.info("DB connection broken, execution failed") + traceback.print_stack() + transitionFailed = True + self._te = None # Not running any more + self._execStats.registerFailure("Broken DB Connection") + # continue # don't do that, need to tap all threads at end, and maybe signal them to stop + else: + raise + # finally: + # pass self.resetExecutedTasks() # clear the tasks after we are done @@ -278,14 +303,14 @@ class ThreadCoordinator: logger.debug("\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep # A new TE for the new step - if not failed: # only if not failed + if not transitionFailed: # only if not failed self._te = TaskExecutor(self._curStep) logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep - self.tapAllThreads() + self.tapAllThreads() # Worker threads will wake up at this point, and each execute it's own task logger.debug("Main thread ready to finish up...") - if not failed: # only in regular situations + if not transitionFailed: # only in regular situations self.crossStepBarrier() # Cross it one last time, after all threads finish self._stepBarrier.reset() logger.debug("Main thread in exclusive zone...") @@ -298,8 +323,8 @@ class ThreadCoordinator: logger.info("\nAll worker threads finished") self._execStats.endExec() - def logStats(self): - self._execStats.logStats() + def printStats(self): + self._execStats.printStats() def tapAllThreads(self): # in a deterministic manner wakeSeq = [] @@ -1061,15 +1086,60 @@ class DbManager(): self._dbConn.close() class TaskExecutor(): + class BoundedList: + def __init__(self, size = 10): + self._size = size + self._list = [] + + def add(self, n: int) : + if not self._list: # empty + self._list.append(n) + return + # now we should insert + nItems = len(self._list) + insPos = 0 + for i in range(nItems): + insPos = i + if n <= self._list[i] : # smaller than this item, time to insert + break # found the insertion point + insPos += 1 # insert to the right + + if insPos == 0 : # except for the 1st item, # TODO: elimiate first item as gating item + return # do nothing + + # print("Inserting at postion {}, value: {}".format(insPos, n)) + self._list.insert(insPos, n) # insert + + newLen = len(self._list) + if newLen <= self._size : + return # do nothing + elif newLen == (self._size + 1) : + del self._list[0] # remove the first item + else : + raise RuntimeError("Corrupt Bounded List") + + def __str__(self): + return repr(self._list) + + _boundedList = BoundedList() + def __init__(self, curStep): self._curStep = curStep + @classmethod + def getBoundedList(cls): + return cls._boundedList + def getCurStep(self): return self._curStep def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread task.execute(wt) + def recordDataMark(self, n: int): + # print("[{}]".format(n), end="", flush=True) + self._boundedList.add(n) + # def logInfo(self, msg): # logger.info(" T[{}.x]: ".format(self._curStep) + msg) @@ -1089,6 +1159,7 @@ class Task(): self._dbManager = dbManager self._workerThread = None self._err = None + self._aborted = False self._curStep = None self._numRows = None # Number of rows affected @@ -1102,6 +1173,9 @@ class Task(): def isSuccess(self): return self._err == None + def isAborted(self): + return self._aborted + def clone(self): # TODO: why do we need this again? newTask = self.__class__(self._dbManager, self._execStats) return newTask @@ -1143,7 +1217,9 @@ class Task(): else: # non-debug print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) + "----------------------------\n") - sys.exit(-1) + # sys.exit(-1) + self._err = err + self._aborted = True except: self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql)) raise @@ -1213,7 +1289,7 @@ class ExecutionStats: self._failed = True self._failureReason = reason - def logStats(self): + def printStats(self): logger.info("----------------------------------------------------------------------") logger.info("| Crash_Gen test {}, with the following stats:". format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED")) @@ -1228,6 +1304,7 @@ class ExecutionStats: logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime)) logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny)) logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime)) + logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList())) logger.info("----------------------------------------------------------------------") @@ -1449,6 +1526,8 @@ class TaskAddData(StateTransitionTask): ds.getNextBinary(), ds.getNextFloat(), ds.getNextTick(), nextInt) self.execWtSql(wt, sql) + # Successfully wrote the data into the DB, let's record it somehow + te.recordDataMark(nextInt) if gConfig.record_ops: self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) self.fAddLogDone.flush() @@ -1528,23 +1607,152 @@ class MyLoggingAdapter(logging.LoggerAdapter): return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs -class MainExec: - @classmethod - def runClient(cls): - # resetDb = False # DEBUG only - # dbState = DbState(resetDb) # DBEUG only! +class SvcManager: + + def __init__(self): + print("Starting service manager") + signal.signal(signal.SIGTERM, self.sigIntHandler) + signal.signal(signal.SIGINT, self.sigIntHandler) + self.ioThread = None + self.subProcess = None + self.shouldStop = False + self.status = MainExec.STATUS_RUNNING + + def svcOutputReader(self, out: IO, queue): + # print("This is the svcOutput Reader...") + for line in out : # iter(out.readline, b''): + # print("Finished reading a line: {}".format(line)) + queue.put(line.rstrip()) # get rid of new lines + print("No more output from incoming IO") # meaning sub process must have died + out.close() + + def sigIntHandler(self, signalNumber, frame): + if self.status != MainExec.STATUS_RUNNING : + print("Ignoring repeated SIGINT...") + return # do nothing if it's already not running + self.status = MainExec.STATUS_STOPPING # immediately set our status + + print("Terminating program...") + self.subProcess.send_signal(signal.SIGINT) + self.shouldStop = True + self.joinIoThread() + + def joinIoThread(self): + if self.ioThread : + self.ioThread.join() + self.ioThread = None + + def run(self): + ON_POSIX = 'posix' in sys.builtin_module_names + svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg'] + # svcCmd = ['vmstat', '1'] + self.subProcess = subprocess.Popen(svcCmd, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX, text=True) + q = Queue() + self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, q)) + self.ioThread.daemon = True # thread dies with the program + self.ioThread.start() + + # proc = subprocess.Popen(['echo', '"to stdout"'], + # stdout=subprocess.PIPE, + # ) + # stdout_value = proc.communicate()[0] + # print('\tstdout: {}'.format(repr(stdout_value))) + + while True : + try: + line = q.get_nowait() # getting output at fast speed + except Empty: + # print('no output yet') + time.sleep(2.3) # wait only if there's no output + else: # got line + print(line) + # print("----end of iteration----") + if self.shouldStop: + print("Ending main Svc thread") + break + + print("end of loop") + + self.joinIoThread() + print("Finished") + +class ClientManager: + def __init__(self): + print("Starting service manager") + signal.signal(signal.SIGTERM, self.sigIntHandler) + signal.signal(signal.SIGINT, self.sigIntHandler) + + self.status = MainExec.STATUS_RUNNING + self.tc = None + + def sigIntHandler(self, signalNumber, frame): + if self.status != MainExec.STATUS_RUNNING : + print("Ignoring repeated SIGINT...") + return # do nothing if it's already not running + self.status = MainExec.STATUS_STOPPING # immediately set our status + + print("Terminating program...") + self.tc.requestToStop() + + def _printLastNumbers(self): # to verify data durability + dbManager = DbManager(resetDb=False) + dbc = dbManager.getDbConn() + if dbc.query("show databases") == 0 : # no databae + return + + dbc.execute("use db") + sTbName = dbManager.getFixedSuperTableName() + + # get all regular tables + dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later + rTables = dbc.getQueryResult() + + bList = TaskExecutor.BoundedList() + for rTbName in rTables : # regular tables + dbc.query("select speed from db.{}".format(rTbName[0])) + numbers = dbc.getQueryResult() + for row in numbers : + # print("<{}>".format(n), end="", flush=True) + bList.add(row[0]) + + print("Top numbers in DB right now: {}".format(bList)) + print("TDengine client execution is about to start in 2 seconds...") + time.sleep(2.0) + dbManager = None # release? + + def prepare(self): + self._printLastNumbers() + + def run(self): + self._printLastNumbers() + dbManager = DbManager() # Regular function Dice.seed(0) # initial seeding of dice thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) - tc = ThreadCoordinator(thPool, dbManager) + self.tc = ThreadCoordinator(thPool, dbManager) - tc.run() - tc.logStats() - dbManager.cleanUp() + self.tc.run() + self.conclude() + + def conclude(self): + self.tc.printStats() + self.tc.getDbManager().cleanUp() + + +class MainExec: + STATUS_RUNNING = 1 + STATUS_STOPPING = 2 + # STATUS_STOPPED = 3 # Not used yet + + @classmethod + def runClient(cls): + clientManager = ClientManager() + clientManager.run() @classmethod def runService(cls): - print("Running service...") + svcManager = SvcManager() + svcManager.run() @classmethod def runTemp(cls): # for debugging purposes From 5c2b0097a770f6e889485eaaf5471f3c0e551d3d Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 6 Jul 2020 03:33:11 +0000 Subject: [PATCH 10/24] Minor tweak of crash_gen tool --- src/wal/src/walMain.c | 1 + tests/pytest/crash_gen.py | 19 +++++++++++++++---- tests/pytest/crash_gen.sh | 2 +- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/wal/src/walMain.c b/src/wal/src/walMain.c index e079653ab3..ae4beabb37 100644 --- a/src/wal/src/walMain.c +++ b/src/wal/src/walMain.c @@ -196,6 +196,7 @@ void walFsync(void *handle) { if (pWal == NULL) return; if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) { + printf("WAL-SYNC executed\n"); if (fsync(pWal->fd) < 0) { wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno)); } diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 9d3c66b97f..a6e2414262 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3.7 +#-----!/usr/bin/python3.7 ################################################################### # Copyright (c) 2016 by TAOS Technologies, Inc. # All rights reserved. @@ -25,6 +25,7 @@ if sys.version_info[0] < 3: import getopt import argparse import copy +import requests import threading import random @@ -1061,8 +1062,11 @@ class DbManager(): def getNextTick(self): with self._lock: # prevent duplicate tick - self._lastTick += datetime.timedelta(0, 1) # add one second to it - return self._lastTick + if Dice.throw(10) == 0 : # 1 in 10 chance + return self._lastTick + datetime.timedelta(0, -100) + else: # regular + self._lastTick += datetime.timedelta(0, 1) # add one second to it + return self._lastTick def getNextInt(self): with self._lock: @@ -1220,7 +1224,12 @@ class Task(): # sys.exit(-1) self._err = err self._aborted = True - except: + except Exception as e : + self.logInfo("Non-TAOS exception encountered") + self._err = e + self._aborted = True + traceback.print_exc() + except : self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql)) raise self._execStats.endTaskType(self.__class__.__name__, self.isSuccess()) @@ -1699,6 +1708,8 @@ class ClientManager: dbc = dbManager.getDbConn() if dbc.query("show databases") == 0 : # no databae return + if dbc.query("show tables") == 0 : # no tables + return dbc.execute("use db") sTbName = dbManager.getFixedSuperTableName() diff --git a/tests/pytest/crash_gen.sh b/tests/pytest/crash_gen.sh index c845b39764..de80361aa3 100755 --- a/tests/pytest/crash_gen.sh +++ b/tests/pytest/crash_gen.sh @@ -38,4 +38,4 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib # Now we are all let, and let's see if we can find a crash. Note we pass all params -./crash_gen.py $@ +python3 ./crash_gen.py $@ From a74e3bff4d004e568d288cb37cf8d286037fdb9f Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 6 Jul 2020 03:34:03 +0000 Subject: [PATCH 11/24] reverting a mistaken change --- src/wal/src/walMain.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/wal/src/walMain.c b/src/wal/src/walMain.c index ae4beabb37..e079653ab3 100644 --- a/src/wal/src/walMain.c +++ b/src/wal/src/walMain.c @@ -196,7 +196,6 @@ void walFsync(void *handle) { if (pWal == NULL) return; if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) { - printf("WAL-SYNC executed\n"); if (fsync(pWal->fd) < 0) { wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno)); } From 5686d65bacec8b7f542bd9a1cdc424a194a9a074 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 6 Jul 2020 05:10:32 +0000 Subject: [PATCH 12/24] Added return code to crash_gen tool --- tests/pytest/crash_gen.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index a6e2414262..49c428b7f1 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -327,6 +327,12 @@ class ThreadCoordinator: def printStats(self): self._execStats.printStats() + def isFailed(self): + return self._execStats.isFailed() + + def getExecStats(self): + return self._execStats + def tapAllThreads(self): # in a deterministic manner wakeSeq = [] for i in range(self._pool.numThreads): # generate a random sequence @@ -1007,7 +1013,7 @@ class DbManager(): # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err)) if ( err.msg == 'client disconnected' ): # cannot open DB connection print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") - sys.exit() + sys.exit(2) else: raise except: @@ -1267,6 +1273,12 @@ class ExecutionStats: self._failed = False self._failureReason = None + def __str__(self): + return "[ExecStats: _failed={}, _failureReason={}".format(self._failed, self._failureReason) + + def isFailed(self): + return self._failed == True + def startExec(self): self._execStartTime = time.time() @@ -1743,7 +1755,11 @@ class ClientManager: self.tc = ThreadCoordinator(thPool, dbManager) self.tc.run() + # print("exec stats: {}".format(self.tc.getExecStats())) + # print("TC failed = {}".format(self.tc.isFailed())) self.conclude() + # print("TC failed (2) = {}".format(self.tc.isFailed())) + return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/ def conclude(self): self.tc.printStats() @@ -1758,7 +1774,7 @@ class MainExec: @classmethod def runClient(cls): clientManager = ClientManager() - clientManager.run() + return clientManager.run() @classmethod def runService(cls): @@ -1863,10 +1879,12 @@ def main(): if gConfig.run_tdengine : # run server MainExec.runService() else : - MainExec.runClient() + return MainExec.runClient() # logger.info("Crash_Gen execution finished") if __name__ == "__main__": - main() + exitCode = main() + # print("Exiting with code: {}".format(exitCode)) + sys.exit(exitCode) From b03ed5253def8a6f78041f01d77db872549dbdd4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 6 Jul 2020 14:16:01 +0000 Subject: [PATCH 13/24] [TD-860] change sync confirm in sdb --- src/dnode/src/dnodeMWrite.c | 6 +- src/mnode/src/mnodeSdb.c | 112 +++++++++++++++++------------------- 2 files changed, 56 insertions(+), 62 deletions(-) diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 90d857155a..a6aed29e3b 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -131,8 +131,8 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { taosFreeQitem(pWrite); } -void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) { - SMnodeMsg *pWrite = pRaw; +void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) { + SMnodeMsg *pWrite = pMsg; if (pWrite == NULL) return; if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { @@ -140,6 +140,8 @@ void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) { return; } + if (code > 0) return; + SRpcMsg rpcRsp = { .handle = pWrite->rpcMsg.handle, .pCont = pWrite->rpcRsp.rsp, diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 761dce6720..cdcb426ba2 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -72,8 +72,6 @@ typedef struct { void * sync; void * wal; SSyncCfg cfg; - sem_t sem; - int32_t code; int32_t numOfTables; SSdbTable *tableList[SDB_TABLE_MAX]; pthread_mutex_t mutex; @@ -244,27 +242,19 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { sdbUpdateMnodeRoles(); } +FORCE_INLINE static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { - tsSdbObj.code = code; - sem_post(&tsSdbObj.sem); - sdbDebug("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code)); -} + SMnodeMsg *pMsg = param; - static int32_t sdbForwardToPeer(SWalHead *pHead) { - if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; - - int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC); - if (code > 0) { - sdbDebug("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code); - sem_wait(&tsSdbObj.sem); - return tsSdbObj.code; - } - return code; + if (pMsg) { + sdbDebug("app:%p:%p, forward request is confirmed, result:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code)); + } + dnodeSendRpcMnodeWriteRsp(pMsg, code); } void sdbUpdateSync() { SSyncCfg syncCfg = {0}; - int32_t index = 0; + int32_t index = 0; SDMMnodeInfos *mnodes = dnodeGetMnodeInfos(); for (int32_t i = 0; i < mnodes->nodeNum; ++i) { @@ -298,7 +288,7 @@ void sdbUpdateSync() { } syncCfg.replica = index; - syncCfg.quorum = (syncCfg.replica == 1) ? 1:2; + syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2; bool hasThisDnode = false; for (int32_t i = 0; i < syncCfg.replica; ++i) { @@ -325,10 +315,10 @@ void sdbUpdateSync() { syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getFileInfo = sdbGetFileInfo; syncInfo.writeToCache = sdbWriteToQueue; - syncInfo.confirmForward = sdbConfirmForward; + syncInfo.confirmForward = sdbConfirmForward; syncInfo.notifyRole = sdbNotifyRole; tsSdbObj.cfg = syncCfg; - + if (tsSdbObj.sync) { syncReconfig(tsSdbObj.sync, &syncCfg); } else { @@ -339,7 +329,6 @@ void sdbUpdateSync() { int32_t sdbInit() { pthread_mutex_init(&tsSdbObj.mutex, NULL); - sem_init(&tsSdbObj.sem, 0, 0); if (sdbInitWriteWorker() != 0) { return -1; @@ -379,7 +368,6 @@ void sdbCleanUp() { tsSdbObj.wal = NULL; } - sem_destroy(&tsSdbObj.sem); pthread_mutex_destroy(&tsSdbObj.mutex); } @@ -513,24 +501,22 @@ static int sdbWrite(void *param, void *data, int type) { assert(pTable != NULL); pthread_mutex_lock(&tsSdbObj.mutex); + if (pHead->version == 0) { - // assign version + // assign version tsSdbObj.version++; pHead->version = tsSdbObj.version; } else { // for data from WAL or forward, version may be smaller if (pHead->version <= tsSdbObj.version) { pthread_mutex_unlock(&tsSdbObj.mutex); - if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) { - sdbDebug("forward request is received, version:%" PRIu64 " confirm it", pHead->version); - syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS); - } + sdbDebug("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64, + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); return TSDB_CODE_SUCCESS; } else if (pHead->version != tsSdbObj.version + 1) { pthread_mutex_unlock(&tsSdbObj.mutex); - sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64, - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, - tsSdbObj.version); + sdbError("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64, + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); return TSDB_CODE_MND_APP_ERROR; } else { tsSdbObj.version = pHead->version; @@ -543,27 +529,33 @@ static int sdbWrite(void *param, void *data, int type) { return code; } - code = sdbForwardToPeer(pHead); + + // forward to peers, even it is WAL/FWD, it shall be called to update version in sync + void *mhandle = NULL; + if (pOper != NULL) mhandle = pOper->pMsg; + int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, mhandle, TAOS_QTYPE_RPC); pthread_mutex_unlock(&tsSdbObj.mutex); + if (syncCode < 0) { + sdbDebug("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + return syncCode; + } else if (syncCode > 0) { + sdbDebug("table:%s, forward request is sent, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else {} + // from app, oper is created if (pOper != NULL) { - sdbTrace("record from app is disposed, table:%s action:%s record:%s version:%" PRIu64 " result:%s", - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, - tstrerror(code)); - return code; + sdbDebug("table:%s, record from app is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + return syncCode; + } else { + sdbDebug("table:%s, record from wal/fwd is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); } // from wal or forward msg, oper not created, should add into hash - if (tsSdbObj.sync != NULL) { - sdbTrace("record from wal forward is disposed, table:%s action:%s record:%s version:%" PRIu64 " confirm it", - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - syncConfirmForward(tsSdbObj.sync, pHead->version, code); - } else { - sdbTrace("record from wal restore is disposed, table:%s action:%s record:%s version:%" PRIu64, pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - } - if (action == SDB_ACTION_INSERT) { SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; code = (*pTable->decodeFp)(&oper); @@ -944,17 +936,16 @@ static void *sdbWorkerFp(void *param) { if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; + if (pOper->pMsg != NULL) { + sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue", + pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj, + sdbGetKeyStr(pOper->table, pHead->cont), pHead->version); + } } else { pHead = (SWalHead *)item; pOper = NULL; } - if (pOper != NULL && pOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue", - pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj, - sdbGetKeyStr(pOper->table, pHead->cont), pHead->version); - } - int32_t code = sdbWrite(pOper, pHead, type); if (pOper) pOper->retCode = code; } @@ -965,23 +956,24 @@ static void *sdbWorkerFp(void *param) { taosResetQitems(tsSdbWriteQall); for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(tsSdbWriteQall, &type, &item); + if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; - if (pOper != NULL && pOper->cb != NULL) { + if (pOper == NULL) { + taosFreeQitem(item); + continue; + } + + if (pOper->cb != NULL) { sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i); pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); } - if (pOper != NULL && pOper->pMsg != NULL) { - sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, - tstrerror(pOper->retCode)); - } - - if (pOper != NULL) { - sdbDecRef(pOper->table, pOper->pObj); - } - dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode); + sdbDecRef(pOper->table, pOper->pObj); + } else if (type == TAOS_QTYPE_FWD) { + syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS); + } else { } taosFreeQitem(item); } From a2e97d8ec29e6bd29087428fcf64baf07ba2384f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 03:36:59 +0000 Subject: [PATCH 14/24] [TD-860] --- src/dnode/src/dnodeMWrite.c | 2 -- src/mnode/src/mnodeSdb.c | 48 ++++++++++++++++++++----------------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index a6aed29e3b..b53c66e00c 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -140,8 +140,6 @@ void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) { return; } - if (code > 0) return; - SRpcMsg rpcRsp = { .handle = pWrite->rpcMsg.handle, .pCont = pWrite->rpcRsp.rsp, diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index cdcb426ba2..70354cf550 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -244,12 +244,20 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { FORCE_INLINE static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { - SMnodeMsg *pMsg = param; + if (code > 0) return; - if (pMsg) { - sdbDebug("app:%p:%p, forward request is confirmed, result:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code)); + assert(param); + + SSdbOper * pOper = param; + SMnodeMsg *pMsg = pOper->pMsg; + + if (pOper->cb != NULL) { + sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg); + pOper->retCode = (*pOper->cb)(pMsg, code); } - dnodeSendRpcMnodeWriteRsp(pMsg, code); + + dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); + taosFreeQitem(pOper); } void sdbUpdateSync() { @@ -529,7 +537,6 @@ static int sdbWrite(void *param, void *data, int type) { return code; } - // forward to peers, even it is WAL/FWD, it shall be called to update version in sync void *mhandle = NULL; if (pOper != NULL) mhandle = pOper->pMsg; @@ -541,18 +548,19 @@ static int sdbWrite(void *param, void *data, int type) { tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); return syncCode; } else if (syncCode > 0) { - sdbDebug("table:%s, forward request is sent, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - } else {} + sdbDebug("table:%s, forward request is sent, syncCode:%d action:%s record:%s version:%" PRId64, pTable->tableName, + syncCode, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else { + } // from app, oper is created if (pOper != NULL) { - sdbDebug("table:%s, record from app is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + sdbDebug("table:%s, record from app is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); return syncCode; } else { - sdbDebug("table:%s, record from wal/fwd is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); } // from wal or forward msg, oper not created, should add into hash @@ -619,7 +627,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -669,7 +677,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -719,7 +727,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -964,18 +972,14 @@ static void *sdbWorkerFp(void *param) { continue; } - if (pOper->cb != NULL) { - sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i); - pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); - } - - dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode); sdbDecRef(pOper->table, pOper->pObj); + sdbConfirmForward(NULL, pOper, pOper->retCode); } else if (type == TAOS_QTYPE_FWD) { syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS); + taosFreeQitem(item); } else { + taosFreeQitem(item); } - taosFreeQitem(item); } } From 94e50c413ea0f811f484b1badc794ebe62076598 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 03:54:18 +0000 Subject: [PATCH 15/24] [TD-860] --- src/mnode/src/mnodeSdb.c | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 70354cf550..0f657bdde8 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -244,19 +244,26 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { FORCE_INLINE static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { - if (code > 0) return; - assert(param); - SSdbOper * pOper = param; SMnodeMsg *pMsg = pOper->pMsg; - if (pOper->cb != NULL) { - sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg); - pOper->retCode = (*pOper->cb)(pMsg, code); + if (code > 0) { + if (pMsg != NULL) { + sdbDebug("app:%p:%p, waiting for slave to confirm this operation", pMsg->rpcMsg.ahandle, pMsg); + } + return; } - dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); + if (pMsg != NULL) { + sdbDebug("app:%p:%p, is confirmed and will do callback func, code:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code)); + } + + if (pOper->cb != NULL) { + code = (*pOper->cb)(pMsg, code); + } + + dnodeSendRpcMnodeWriteRsp(pMsg, code); taosFreeQitem(pOper); } @@ -538,9 +545,7 @@ static int sdbWrite(void *param, void *data, int type) { } // forward to peers, even it is WAL/FWD, it shall be called to update version in sync - void *mhandle = NULL; - if (pOper != NULL) mhandle = pOper->pMsg; - int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, mhandle, TAOS_QTYPE_RPC); + int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); pthread_mutex_unlock(&tsSdbObj.mutex); if (syncCode < 0) { From e35d89a4b47a2a315d463b32e381e0416735e5af Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 09:56:20 +0000 Subject: [PATCH 16/24] [TD-860] add processed count for sdb sync --- src/mnode/inc/mnodeSdb.h | 1 + src/mnode/src/mnodeSdb.c | 54 ++++++++++++++++++++-------------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index ca2fffe24c..eec6d45e23 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -53,6 +53,7 @@ typedef struct { void * rowData; int32_t rowSize; int32_t retCode; // for callback in sdb queue + int32_t processedCount; // for sync fwd callback int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code); struct SMnodeMsg *pMsg; } SSdbOper; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 0f657bdde8..4aed820958 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -247,20 +247,22 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { assert(param); SSdbOper * pOper = param; SMnodeMsg *pMsg = pOper->pMsg; + if (code <= 0) pOper->retCode = code; - if (code > 0) { + int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1); + if (processedCount <= 1) { if (pMsg != NULL) { - sdbDebug("app:%p:%p, waiting for slave to confirm this operation", pMsg->rpcMsg.ahandle, pMsg); + sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d", pMsg->rpcMsg.ahandle, pMsg, processedCount); } return; } if (pMsg != NULL) { - sdbDebug("app:%p:%p, is confirmed and will do callback func, code:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code)); + sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg); } if (pOper->cb != NULL) { - code = (*pOper->cb)(pMsg, code); + code = (*pOper->cb)(pMsg, pOper->retCode); } dnodeSendRpcMnodeWriteRsp(pMsg, code); @@ -543,31 +545,34 @@ static int sdbWrite(void *param, void *data, int type) { pthread_mutex_unlock(&tsSdbObj.mutex); return code; } - - // forward to peers, even it is WAL/FWD, it shall be called to update version in sync - int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); - pthread_mutex_unlock(&tsSdbObj.mutex); - if (syncCode < 0) { - sdbDebug("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - return syncCode; - } else if (syncCode > 0) { - sdbDebug("table:%s, forward request is sent, syncCode:%d action:%s record:%s version:%" PRId64, pTable->tableName, - syncCode, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - } else { - } + pthread_mutex_unlock(&tsSdbObj.mutex); // from app, oper is created if (pOper != NULL) { - sdbDebug("table:%s, record from app is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + // forward to peers + int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); + if (syncCode <= 0) atomic_add_fetch_32(&pOper->processedCount, 1); + + if (syncCode < 0) { + sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else if (syncCode > 0) { + sdbDebug("table:%s, forward request is sent, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else { + sdbTrace("table:%s, no need to send fwd request, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } return syncCode; - } else { - sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); } + sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + + // even it is WAL/FWD, it shall be called to update version in sync + syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); + // from wal or forward msg, oper not created, should add into hash if (action == SDB_ACTION_INSERT) { SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; @@ -972,11 +977,6 @@ static void *sdbWorkerFp(void *param) { if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; - if (pOper == NULL) { - taosFreeQitem(item); - continue; - } - sdbDecRef(pOper->table, pOper->pObj); sdbConfirmForward(NULL, pOper, pOper->retCode); } else if (type == TAOS_QTYPE_FWD) { From 4f13c53acfc8c10afc84ea00344fdae4e79453fa Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 10:36:57 +0000 Subject: [PATCH 17/24] [TD-860] ret code for sdb --- src/mnode/src/mnodeSdb.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 4aed820958..9f46a77483 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -262,10 +262,10 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { } if (pOper->cb != NULL) { - code = (*pOper->cb)(pMsg, pOper->retCode); + pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode); } - dnodeSendRpcMnodeWriteRsp(pMsg, code); + dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); taosFreeQitem(pOper); } @@ -552,7 +552,7 @@ static int sdbWrite(void *param, void *data, int type) { if (pOper != NULL) { // forward to peers int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); - if (syncCode <= 0) atomic_add_fetch_32(&pOper->processedCount, 1); + if (syncCode > 0) pOper->processedCount = 0; if (syncCode < 0) { sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, @@ -953,6 +953,7 @@ static void *sdbWorkerFp(void *param) { taosGetQitem(tsSdbWriteQall, &type, &item); if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; + pOper->processedCount = 1; pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; if (pOper->pMsg != NULL) { sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue", @@ -965,7 +966,7 @@ static void *sdbWorkerFp(void *param) { } int32_t code = sdbWrite(pOper, pHead, type); - if (pOper) pOper->retCode = code; + if (pOper && code <= 0) pOper->retCode = code; } walFsync(tsSdbObj.wal); From 453715c789546e888d5614f4a4658e232ccd2c7f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 11:10:52 +0000 Subject: [PATCH 18/24] [TD-860] definite lost while update dnode --- src/mnode/src/mnodeDnode.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 4707616ba8..804a64be08 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -88,13 +88,13 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) { } static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) { - SDnodeObj *pDnode = pOper->pObj; - SDnodeObj *pSaved = mnodeGetDnode(pDnode->dnodeId); - if (pSaved != NULL && pDnode != pSaved) { - memcpy(pSaved, pDnode, pOper->rowSize); - free(pDnode); - mnodeDecDnodeRef(pSaved); + SDnodeObj *pNew = pOper->pObj; + SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId); + if (pDnode != NULL && pNew != pDnode) { + memcpy(pDnode, pNew, pOper->rowSize); + free(pNew); } + mnodeDecDnodeRef(pDnode); return TSDB_CODE_SUCCESS; } From 566b286084eec3958f027386d0867ff7b6d26a68 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Jul 2020 11:23:44 +0000 Subject: [PATCH 19/24] [TD-860] invalid write in sdb --- src/mnode/src/mnodeSdb.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 9f46a77483..0c2e478a07 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -551,8 +551,9 @@ static int sdbWrite(void *param, void *data, int type) { // from app, oper is created if (pOper != NULL) { // forward to peers + pOper->processedCount = 0; int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); - if (syncCode > 0) pOper->processedCount = 0; + if (syncCode <= 0) pOper->processedCount = 1; if (syncCode < 0) { sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, From 7e792f465d7124fa0ed3a528f884f7ddc212f1de Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 7 Jul 2020 06:30:08 -0700 Subject: [PATCH 20/24] dont use sleep --- src/rpc/test/rclient.c | 1 + src/rpc/test/rserver.c | 21 ++++++--------------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index e51b54e299..f8dbbedb11 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -156,6 +156,7 @@ int main(int argc, char *argv[]) { } tInfo("client is initialized"); + tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs); gettimeofday(&systemTime, NULL); startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 1ac9409a57..d06e9df64b 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -24,23 +24,21 @@ int msgSize = 128; int commit = 0; int dataFd = -1; void *qhandle = NULL; +void *qset = NULL; void processShellMsg() { static int num = 0; taos_qall qall; SRpcMsg *pRpcMsg, rpcMsg; int type; + void *pvnode; qall = taosAllocateQall(); while (1) { - int numOfMsgs = taosReadAllQitems(qhandle, qall); - if (numOfMsgs <= 0) { - usleep(100); - continue; - } - + int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode); tDebug("%d shell msgs are received", numOfMsgs); + if (numOfMsgs <= 0) break; for (int i=0; i Date: Wed, 8 Jul 2020 09:01:50 +0800 Subject: [PATCH 21/24] [TD-872] --- src/kit/taosdemo/taosdemo.c | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 9a5aedcdb7..6ba420514b 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -520,9 +520,8 @@ int main(int argc, char *argv[]) { snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols); queryDB(taos, command); printf("meters created!\n"); - - taos_close(taos); } + taos_close(taos); /* Wait for table to create */ multiThreadCreateTable(cols, use_metric, threads, ntables, db_name, tb_prefix, ip_addr, port, user, pass); @@ -792,9 +791,6 @@ void * createTable(void *sarg) snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols); queryDB(winfo->taos, command); } - - taos_close(winfo->taos); - } else { /* Create all the tables; */ printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); @@ -812,7 +808,6 @@ void * createTable(void *sarg) } queryDB(winfo->taos, command); } - taos_close(winfo->taos); } return NULL; From 4dadf6ad649ba8aebda5e652dc61535a3447b846 Mon Sep 17 00:00:00 2001 From: Hui Li Date: Wed, 8 Jul 2020 09:05:18 +0800 Subject: [PATCH 22/24] [fix bug: no free pSql] --- src/kit/shell/src/shellImport.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c index ba123ac2d4..a440db7301 100644 --- a/src/kit/shell/src/shellImport.c +++ b/src/kit/shell/src/shellImport.c @@ -206,9 +206,10 @@ static void shellSourceFile(TAOS *con, char *fptr) { if (code != 0) { fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo); - /* free local resouce: allocated memory/metric-meta refcnt */ - taos_free_result(pSql); } + + /* free local resouce: allocated memory/metric-meta refcnt */ + taos_free_result(pSql); memset(cmd, 0, MAX_COMMAND_SIZE); cmd_len = 0; From a54a425c753719e3366ce9e579745d44aea00f5e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 8 Jul 2020 10:33:11 +0800 Subject: [PATCH 23/24] failed to create table in tsdb while sync failed --- src/mnode/src/mnodeTable.c | 26 +++++++++++++++++++------- src/mnode/src/mnodeVgroup.c | 18 ++++++++++++------ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 47add8f7a3..5c32130925 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -783,9 +783,15 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) { static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; - if (pTable != NULL) { - mLInfo("app:%p:%p, stable:%s, is created in sdb, result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, - tstrerror(code)); + assert(pTable); + + if (code == TSDB_CODE_SUCCESS) { + mLInfo("stable:%s, is created in sdb", pTable->info.tableId); + } else { + mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, + tstrerror(code)); + SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsSuperTableSdb}; + sdbDeleteRow(&desc); } return code; @@ -1561,10 +1567,16 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; assert(pTable); - mDebug("app:%p:%p, table:%s, create table in id:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pMsg, - pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code)); - - if (code != TSDB_CODE_SUCCESS) return code; + if (code == TSDB_CODE_SUCCESS) { + mDebug("app:%p:%p, table:%s, create table in sid:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, + pTable->sid, pTable->uid); + } else { + mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code)); + SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb}; + sdbDeleteRow(&desc); + return code; + } SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable); diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 3a9076335a..8fb1b749c2 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -348,17 +348,23 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) { } static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { + SVgObj *pVgroup = pMsg->pVgroup; + SDbObj *pDb = pMsg->pDb; + assert(pVgroup); + if (code != TSDB_CODE_SUCCESS) { - pMsg->pVgroup = NULL; + mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, + tstrerror(code)); + SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; + sdbDeleteRow(&desc); return code; } - SVgObj *pVgroup = pMsg->pVgroup; - SDbObj *pDb = pMsg->pDb; - - mInfo("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); + mInfo("app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, + pDb->name, pVgroup->numOfVnodes); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - mInfo("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId); + mInfo("app:%p:%p, vgId:%d, index:%d, dnode:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, i, + pVgroup->vnodeGid[i].dnodeId); } mnodeIncVgroupRef(pVgroup); From 354dd93d2bbd2fc724e11aa63dda5c2ef75dd09c Mon Sep 17 00:00:00 2001 From: Hui Li Date: Wed, 8 Jul 2020 10:44:41 +0800 Subject: [PATCH 24/24] [add cluster sim cases] --- tests/script/unique/cluster/client1_0.sim | 16 + tests/script/unique/cluster/cluster_main.sim | 494 +++++++++++++++++++ 2 files changed, 510 insertions(+) create mode 100644 tests/script/unique/cluster/client1_0.sim create mode 100644 tests/script/unique/cluster/cluster_main.sim diff --git a/tests/script/unique/cluster/client1_0.sim b/tests/script/unique/cluster/client1_0.sim new file mode 100644 index 0000000000..f7ed46436e --- /dev/null +++ b/tests/script/unique/cluster/client1_0.sim @@ -0,0 +1,16 @@ +sql connect + +$db = db1 +$stb = stb1 +print =============== client1_0: + +sql use $db + +$tblNum = 1000 + +$i = 1 +while $i < $tblNum + $tb = tb . $i + sql create table $tb using $stb tags ($i, 'abcd') + $i = $i + 1 +endw diff --git a/tests/script/unique/cluster/cluster_main.sim b/tests/script/unique/cluster/cluster_main.sim new file mode 100644 index 0000000000..5b356948ea --- /dev/null +++ b/tests/script/unique/cluster/cluster_main.sim @@ -0,0 +1,494 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3 +system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3 +system sh/cfg.sh -n dnode4 -c numOfMnodes -v 3 + +system sh/cfg.sh -n dnode1 -c walLevel -v 1 +system sh/cfg.sh -n dnode2 -c walLevel -v 1 +system sh/cfg.sh -n dnode3 -c walLevel -v 1 +system sh/cfg.sh -n dnode4 -c walLevel -v 1 + +system sh/cfg.sh -n dnode1 -c balanceInterval -v 10 +system sh/cfg.sh -n dnode2 -c balanceInterval -v 10 +system sh/cfg.sh -n dnode3 -c balanceInterval -v 10 +system sh/cfg.sh -n dnode4 -c balanceInterval -v 10 + +system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4 +system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4 +system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4 +system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4 + +system sh/cfg.sh -n dnode1 -c alternativeRole -v 0 +system sh/cfg.sh -n dnode2 -c alternativeRole -v 0 +system sh/cfg.sh -n dnode3 -c alternativeRole -v 0 +system sh/cfg.sh -n dnode4 -c alternativeRole -v 0 + +system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 1000 +system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 1000 +system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 1000 +system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 1000 + +system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator + +print ============== step0: start tarbitrator +system sh/exec_tarbitrator.sh -s start + +print ============== step1: start dnode1/dnode2/dnode3 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +sleep 3000 +sql connect +sql create dnode $hostname2 +sql create dnode $hostname3 +sleep 3000 + +print ============== step2: create db1 with replica 3 +$db = db1 +print create database $db replica 3 +#sql create database $db replica 3 maxTables $totalTableNum +sql create database $db replica 3 +sql use $db + +print ============== step3: create stable stb1 +$stb = stb1 +sql create table $stb (ts timestamp, c1 int, c2 int) tags(t1 int, t2 binary(8)) + +print ============== step4: start 10 client1/ 10 client2/ 10 client3/ 10 client4/ 1 client5 +run_back unique/cluster/client1_0.sim +#run_back unique/cluster/client1_1.sim +#run_back unique/big_cluster/client1_2.sim +#run_back unique/big_cluster/client1_3.sim +#run_back unique/big_cluster/client1_4.sim +#run_back unique/big_cluster/client1_5.sim +#run_back unique/big_cluster/client1_6.sim +#run_back unique/big_cluster/client1_7.sim +#run_back unique/big_cluster/client1_8.sim +#run_back unique/big_cluster/client1_9.sim + + +print wait for a while to let clients start insert data +sleep 5000 + +$loop_cnt = 0 +loop_cluster_do: +print **** **** **** START loop cluster do **** **** **** **** +print ============== step5: start dnode4 and add into cluster, then wait dnode4 ready +system sh/exec.sh -n dnode4 -s start +sql create dnode $hostname4 + +wait_dnode4_ready_0: +$cnt = $cnt + 1 +if $cnt == 10 then + return -1 +endi +sql show dnodes +if $rows != 4 then + sleep 2000 + goto wait_dnode4_ready_0 +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 +$dnode3Status = $data4_3 +#$dnode4Status = $data4_4 + +if $loop_cnt == 0 then + $dnode4Status = $data4_4 +elif $loop_cnt == 1 then + $dnode4Status = $data4_6 +elif $loop_cnt == 2 then + $dnode4Status = $data4_8 +else then + print **** **** **** END loop cluster do 2**** **** **** **** + return +endi + +if $dnode4Status != ready then + sleep 2000 + goto wait_dnode4_ready_0 +endi + + +print ============== step6: stop and drop dnode1, then remove data dir of dnode1 +system sh/exec.sh -n dnode1 -s stop -x SIGINT + +$cnt = 0 +wait_dnode1_offline_0: +$cnt = $cnt + 1 +if $cnt == 10 then + return -1 +endi +sql show dnodes +if $rows != 4 then + sleep 2000 + goto wait_dnode1_offline_0 +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 + +$dnode2Status = $data4_2 +$dnode3Status = $data4_3 +$dnode4Status = $data4_4 + +if $loop_cnt == 0 then + $dnode1Status = $data4_1 +elif $loop_cnt == 1 then + $dnode1Status = $data4_5 +elif $loop_cnt == 2 then + $dnode1Status = $data4_7 +elif $loop_cnt == 3 then + $dnode1Status = $data4_9 +else then + print **** **** **** END loop cluster do 1**** **** **** **** + return +endi + +if $dnode1Status != offline then + sleep 2000 + goto wait_dnode1_offline_0 +endi + +sql drop dnode $hostname1 +system rm -rf ../../../sim/dnode1 + + +print ============== step7: stop dnode2, because mnodes < 50%, so clusert don't provide services +system sh/exec.sh -n dnode2 -s stop -x SIGINT + +sql show dnodes -x wait_dnode2_offline_0 +if $rows != 3 then + sleep 2000 + goto wait_dnode2_offline_0 +endi +wait_dnode2_offline_0: + +#$cnt = 0 +#wait_dnode2_offline_0: +#$cnt = $cnt + 1 +#if $cnt == 10 then +# return -1 +#endi +#sql show dnodes -x wait_dnode2_offline_0 +#if $rows != 3 then +# sleep 2000 +# goto wait_dnode2_offline_0 +#endi +#print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +#print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +#print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +#print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 +#$dnode1Status = $data4_1 +#$dnode2Status = $data4_2 +#$dnode3Status = $data4_3 +#$dnode4Status = $data4_4 +# +#if $dnode2Status != offline then +# sleep 2000 +# goto wait_dnode1_offline_0 +#endi + +print ============== step8: restart dnode2, then wait sync end +system sh/exec.sh -n dnode2 -s start + +$cnt = 0 +wait_dnode2_ready_0: +$cnt = $cnt + 1 +if $cnt == 10 then + return -1 +endi +sql show dnodes +if $rows != 3 then + sleep 2000 + goto wait_dnode2_ready_0 +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 +$dnode3Status = $data4_3 +$dnode4Status = $data4_4 + +if $dnode2Status != ready then + sleep 2000 + goto wait_dnode2_ready_0 +endi + + +print ============== step9: stop dnode3, then wait sync end +system sh/exec.sh -n dnode3 -s stop -x SIGINT + +$cnt = 0 +wait_dnode3_offline_0: +$cnt = $cnt + 1 +if $cnt == 10 then + return -1 +endi +sql show dnodes +if $rows != 3 then + sleep 2000 + goto wait_dnode3_offline_0 +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 +$dnode3Status = $data4_3 +$dnode4Status = $data4_4 + +if $dnode3Status != offline then + sleep 2000 + goto wait_dnode3_offline_0 +endi + +print ============== step10: restart dnode3, then wait sync end +system sh/exec.sh -n dnode3 -s start + +$cnt = 0 +wait_dnode3_ready_0: +$cnt = $cnt + 1 +if $cnt == 10 then + return -1 +endi +sql show dnodes +if $rows != 3 then + sleep 2000 + goto wait_dnode3_ready_0 +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 +$dnode3Status = $data4_3 +$dnode4Status = $data4_4 + +if $dnode3Status != ready then + sleep 2000 + goto wait_dnode3_ready_0 +endi + +print ============== step11: stop dnode4, then wait sync end +system sh/exec.sh -n dnode4 -s stop -x SIGINT + +$cnt = 0 +wait_dnode4_offline_0: +$cnt = $cnt + 1 +if $cnt == 10 then + return -1 +endi +sql show dnodes +if $rows != 3 then + sleep 2000 + goto wait_dnode4_offline_0 +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 +$dnode3Status = $data4_3 +#$dnode4Status = $data4_4 + +if $loop_cnt == 0 then + $dnode4Status = $data4_4 +elif $loop_cnt == 1 then + $dnode4Status = $data4_6 +elif $loop_cnt == 2 then + $dnode4Status = $data4_8 +else then + print **** **** **** END loop cluster do 2**** **** **** **** + return +endi + +if $dnode4Status != offline then + sleep 2000 + goto wait_dnode4_offline_0 +endi + +print ============== step12: restart dnode4, then wait sync end +system sh/exec.sh -n dnode4 -s start + +$cnt = 0 +wait_dnode4_ready_0: +$cnt = $cnt + 1 +if $cnt == 10 then + return -1 +endi +sql show dnodes +if $rows != 3 then + sleep 2000 + goto wait_dnode4_ready_0 +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 +$dnode1Status = $data4_1 +$dnode2Status = $data4_2 +$dnode3Status = $data4_3 +#$dnode4Status = $data4_4 + +if $loop_cnt == 0 then + $dnode4Status = $data4_4 +elif $loop_cnt == 1 then + $dnode4Status = $data4_6 +elif $loop_cnt == 2 then + $dnode4Status = $data4_8 +else then + print **** **** **** END loop cluster do 2**** **** **** **** + return +endi + +if $dnode4Status != ready then + sleep 2000 + goto wait_dnode4_ready_0 +endi + +print ============== step13: alter replica 2 +sql alter database $db replica 2 +sql show database +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 + +if $data0_5 != 2 then + print rplica is not modify to 2, error!!!!!! + return +endi + +print ============== step14: stop and drop dnode4, then remove data dir of dnode4 +system sh/exec.sh -n dnode4 -s stop -x SIGINT + +$cnt = 0 +wait_dnode4_offline_1: +$cnt = $cnt + 1 +if $cnt == 10 then + return -1 +endi +sql show dnodes +if $rows != 3 then + sleep 2000 + goto wait_dnode4_offline_1 +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 + +$dnode2Status = $data4_2 +$dnode3Status = $data4_3 +#$dnode4Status = $data4_4 + +if $loop_cnt == 0 then + $dnode4Status = $data4_4 +elif $loop_cnt == 1 then + $dnode4Status = $data4_6 +elif $loop_cnt == 2 then + $dnode4Status = $data4_8 +else then + print **** **** **** END loop cluster do 2**** **** **** **** + return +endi + +if $dnode4Status != offline then + sleep 2000 + goto wait_dnode4_offline_1 +endi + +sql drop dnode $hostname4 +system rm -rf ../../../sim/dnode4 + + +print ============== step15: alter replica 1 +sql alter database $db replica 1 +sql show database +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 + +if $data0_5 != 1 then + print rplica is not modify to 1, error!!!!!! + return +endi + + +print ============== step16: alter replica 2 +sql alter database $db replica 1 +sql show database +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 + +if $data0_5 != 2 then + print rplica is not modify to 2, error!!!!!! + return +endi + +print ============== step17: start dnode1 and add into cluster, then wait dnode1 ready +system sh/exec.sh -n dnode1 -s start +sql create dnode $hostname1 + +wait_dnode1_ready_0: +$cnt = $cnt + 1 +if $cnt == 10 then + return -1 +endi +sql show dnodes +if $rows != 3 then + sleep 2000 + goto wait_dnode1_ready_0 +endi +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 +print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 +print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 +print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 +#$dnode1Status = $data4_1 +$dnode2Status = $data4_2 +$dnode3Status = $data4_3 +$dnode4Status = $data4_4 + +if $loop_cnt == 0 then + $dnode1Status = $data4_1 +elif $loop_cnt == 1 then + $dnode1Status = $data4_5 +elif $loop_cnt == 2 then + $dnode1Status = $data4_7 +elif $loop_cnt == 3 then + $dnode1Status = $data4_9 +else then + print **** **** **** END loop cluster do 3**** **** **** **** + return +endi + +if $dnode1Status != ready then + sleep 2000 + goto wait_dnode1_ready_0 +endi + +print ============== step18: alter replica 3 +sql alter database $db replica 3 +sql show database +print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 + +if $data0_5 != 3 then + print rplica is not modify to 3, error!!!!!! + return +endi + +$loop_cnt = $loop_cnt + 1 +goto loop_cluster_do