From 2e2a3f10e18eb9955c9d5f9b95e6260eeb283868 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 8 Jun 2020 23:58:09 -0700 Subject: [PATCH 01/11] Added the -l option to crash_gen tool, writing larger amount of data --- tests/pytest/crash_gen.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index c88683aa09..0006247fc9 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -42,7 +42,10 @@ import crash_gen import taos # Global variables, tried to keep a small number. -gConfig = None # Command-line/Environment Configurations, will set a bit later + +# Command-line/Environment Configurations, will set a bit later +# ConfigNameSpace = argparse.Namespace +gConfig = argparse.Namespace() # Dummy value, will be replaced later logger = None def runThread(wt: WorkerThread): @@ -1171,8 +1174,8 @@ class AddFixedDataTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbState wt.execSql("use db") # TODO: seems to be an INSERT bug to require this - for i in range(10): # 0 to 9 - for j in range(10) : + for i in range(35 if gConfig.larger_data else 2): # number of regular tables in the super table + for j in range(100 if gConfig.larger_data else 2) : # number of records per table sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format( i, ds.getFixedSuperTableName(), @@ -1301,10 +1304,12 @@ def main(): 2. You run the server there before this script: ./build/bin/taosd -c test/cfg ''')) - parser.add_argument('-p', '--per-thread-db-connection', action='store_true', - help='Use a single shared db connection (default: false)') parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument('-l', '--larger-data', action='store_true', + help='Write larger amount of data during write operations (default: false)') + parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + help='Use a single shared db connection (default: false)') parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, help='Maximum number of steps to run (default: 100)') parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, From ac1fb970d5f1d6599f1d9a92752a40e18eb11aa3 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 9 Jun 2020 21:30:23 -0700 Subject: [PATCH 02/11] Added overlaping data insertion, plus occasional db connection drops --- tests/pytest/crash_gen.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 0006247fc9..d300a277e7 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -32,6 +32,7 @@ import textwrap from typing import List from typing import Dict +from typing import Set from util.log import * from util.dnodes import * @@ -969,7 +970,7 @@ class Task(): self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err)) - self._err = err + self._err = err except: self.logDebug("[=] Unexpected exception") raise @@ -1136,10 +1137,14 @@ class ReadFixedDataTask(StateTransitionTask): sTbName = self._dbState.getFixedSuperTableName() dbc = wt.getDbConn() dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later - rTables = dbc.getQueryResult() - # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - for rTbName in rTables : # regular tables - dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations + dbc.close() + dbc.open() + else: + rTables = dbc.getQueryResult() + # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + for rTbName in rTables : # regular tables + dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure # tdSql.query(" cars where tbname in ('carzero', 'carone')") @@ -1160,6 +1165,8 @@ class DropFixedSuperTableTask(StateTransitionTask): wt.execSql("drop table db.{}".format(tblName)) class AddFixedDataTask(StateTransitionTask): + activeTable : Set[int] = set() # Track which table is being actively worked on + @classmethod def getInfo(cls): return [ @@ -1174,14 +1181,24 @@ class AddFixedDataTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbState wt.execSql("use db") # TODO: seems to be an INSERT bug to require this - for i in range(35 if gConfig.larger_data else 2): # number of regular tables in the super table - for j in range(100 if gConfig.larger_data else 2) : # number of records per table + tblSeq = list(range(35 if gConfig.larger_data else 2)) + random.shuffle(tblSeq) + for i in tblSeq: + if ( i in self.activeTable ): # wow already active + # logger.info("Concurrent data insertion into table: {}".format(i)) + # print("ct({})".format(i), end="", flush=True) # Concurrent insertion into table + print("x", end="", flush=True) + else: + self.activeTable.add(i) # marking it active + # No need to shuffle data sequence, unless later we decide to do non-increment insertion + for j in range(50 if gConfig.larger_data else 2) : # number of records per table sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format( i, ds.getFixedSuperTableName(), ds.getNextBinary(), ds.getNextFloat(), ds.getNextTick(), ds.getNextInt()) wt.execSql(sql) + self.activeTable.discard(i) # not raising an error, unlike remove #---------- Non State-Transition Related Tasks ----------# From 511953665d4e95efc9236042049d0e404d328579 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 9 Jun 2020 23:40:10 -0700 Subject: [PATCH 03/11] Removed crash_gen check conditions not suited for larger (10-thread) parallel processing tests --- tests/pytest/crash_gen.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index d300a277e7..9ae3b4bcbb 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -618,15 +618,15 @@ class StateDbOnly(AnyState): self.assertIfExistThenSuccess(tasks, DropDbTask) # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases # Nothing to be said about adding data task - if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB + # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess - self.assertAtMostOneSuccess(tasks, DropDbTask) + # self.assertAtMostOneSuccess(tasks, DropDbTask) # self._state = self.STATE_EMPTY - elif ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success + if ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table if ( not self.hasTask(tasks, DropFixedSuperTableTask) ): self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful, if we don't drop anything - self.assertNoTask(tasks, DropDbTask) # should have have tried + # self.assertNoTask(tasks, DropDbTask) # should have have tried # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # # can't say there's add-data attempts, since they may all fail # self._state = self.STATE_TABLE_ONLY @@ -686,9 +686,10 @@ class StateHasData(AnyState): self.assertNoTask(tasks, DropFixedSuperTableTask) self.assertNoTask(tasks, AddFixedDataTask) # self.hasSuccess(tasks, DeleteDataTasks) - else: + else: # should be STATE_HAS_DATA self.assertNoTask(tasks, DropDbTask) - self.assertNoTask(tasks, DropFixedSuperTableTask) + if (not self.hasTask(tasks, CreateFixedSuperTableTask)) : # if we didn't create the table + self.assertNoTask(tasks, DropFixedSuperTableTask) # we should not have a task that drops it self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) From c6b71abc468635a93b19fddad02d68dfa6fc6af1 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Wed, 10 Jun 2020 00:11:50 -0700 Subject: [PATCH 04/11] Minor tweak of crash_gen script --- tests/pytest/crash_gen.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 9ae3b4bcbb..486edb1ba0 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -675,7 +675,8 @@ class StateHasData(AnyState): def verifyTasksToState(self, tasks, newState): if ( newState.equals(AnyState.STATE_EMPTY) ): self.hasSuccess(tasks, DropDbTask) - self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy + if ( not self.hasTask(tasks, CreateDbTask) ) : + self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task self.assertNoTask(tasks, DropDbTask) # we must have drop_db task From 7dd6e6a442c8d33f2c722a6920e83a67a9c02f1d Mon Sep 17 00:00:00 2001 From: Steven Li Date: Thu, 11 Jun 2020 18:50:49 -0700 Subject: [PATCH 05/11] removed a check condition from crash_gen --- tests/pytest/crash_gen.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 486edb1ba0..7fe321a3eb 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -690,8 +690,8 @@ class StateHasData(AnyState): else: # should be STATE_HAS_DATA self.assertNoTask(tasks, DropDbTask) if (not self.hasTask(tasks, CreateFixedSuperTableTask)) : # if we didn't create the table - self.assertNoTask(tasks, DropFixedSuperTableTask) # we should not have a task that drops it - self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) + self.assertNoTask(tasks, DropFixedSuperTableTask) # we should not have a task that drops it + # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) # State of the database as we believe it to be From 815cb83904b0b543388c7c3f66be9187f77c28b3 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Fri, 12 Jun 2020 23:56:53 -0700 Subject: [PATCH 06/11] Added -r option for crash_gen.sh, recording data opersions, for power-off tests --- tests/pytest/crash_gen.py | 42 ++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 7fe321a3eb..e933a865d0 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -14,6 +14,7 @@ from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel import sys +import os import traceback # Require Python 3 if sys.version_info[0] < 3: @@ -1168,6 +1169,24 @@ class DropFixedSuperTableTask(StateTransitionTask): class AddFixedDataTask(StateTransitionTask): activeTable : Set[int] = set() # Track which table is being actively worked on + LARGE_NUMBER_OF_TABLES = 35 + SMALL_NUMBER_OF_TABLES = 3 + LARGE_NUMBER_OF_RECORDS = 50 + SMALL_NUMBER_OF_RECORDS = 3 + + # We use these two files to record operations to DB, useful for power-off tests + fAddLogReady = None + fAddLogDone = None + + @classmethod + def prepToRecordOps(cls): + if gConfig.record_ops : + if ( cls.fAddLogReady == None ): + logger.info("Recording in a file operations to be performed...") + cls.fAddLogReady = open("add_log_ready.txt", "w") + if ( cls.fAddLogDone == None ): + logger.info("Recording in a file operations completed...") + cls.fAddLogDone = open("add_log_done.txt", "w") @classmethod def getInfo(cls): @@ -1183,7 +1202,7 @@ class AddFixedDataTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbState wt.execSql("use db") # TODO: seems to be an INSERT bug to require this - tblSeq = list(range(35 if gConfig.larger_data else 2)) + tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) for i in tblSeq: if ( i in self.activeTable ): # wow already active @@ -1193,13 +1212,24 @@ class AddFixedDataTask(StateTransitionTask): else: self.activeTable.add(i) # marking it active # No need to shuffle data sequence, unless later we decide to do non-increment insertion - for j in range(50 if gConfig.larger_data else 2) : # number of records per table - sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format( - i, + for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table + nextInt = ds.getNextInt() + regTableName = "db.reg_table_{}".format(i) + if gConfig.record_ops: + self.prepToRecordOps() + self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) + self.fAddLogReady.flush() + os.fsync(self.fAddLogReady) + sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format( + regTableName, ds.getFixedSuperTableName(), ds.getNextBinary(), ds.getNextFloat(), - ds.getNextTick(), ds.getNextInt()) + ds.getNextTick(), nextInt) wt.execSql(sql) + if gConfig.record_ops: + self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) + self.fAddLogDone.flush() + os.fsync(self.fAddLogDone) self.activeTable.discard(i) # not raising an error, unlike remove @@ -1329,6 +1359,8 @@ def main(): help='Write larger amount of data during write operations (default: false)') parser.add_argument('-p', '--per-thread-db-connection', action='store_true', help='Use a single shared db connection (default: false)') + parser.add_argument('-r', '--record-ops', action='store_true', + help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, help='Maximum number of steps to run (default: 100)') parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, From 48e4f79af3f7144aa33e40685fa8858df8c3dede Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 25 Jun 2020 04:53:28 +0000 Subject: [PATCH 07/11] remove timer for TCP --- src/rpc/src/rpcMain.c | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 9a4509f8e1..640b03e885 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -819,7 +819,8 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { tTrace("%s, authentication shall be restarted", pConn->info); pConn->secured = 0; rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); - pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); + if (pConn->connType != RPC_CONN_TCPC) + pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); return TSDB_CODE_RPC_ALREADY_PROCESSED; } @@ -828,7 +829,8 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { tTrace("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry); pConn->tretry++; rpcSendReqHead(pConn); - pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); + if (pConn->connType != RPC_CONN_TCPC) + pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); return TSDB_CODE_RPC_ALREADY_PROCESSED; } else { // peer still in processing, give up @@ -896,8 +898,12 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { terrno = rpcProcessReqHead(pConn, pHead); pConn->connType = pRecv->connType; - // client shall send the request within tsRpcTime again, double it - taosTmrReset(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); + // stop idle timer + taosTmrStopA(&pConn->pIdleTimer); + + // client shall send the request within tsRpcTime again for UDP, double it + if (pConn->connType != RPC_CONN_TCPS) + pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); } @@ -1024,7 +1030,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { rpcAddRef(pRpc); // add the refCount for requests // start the progress timer to monitor the response from server app - pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); + if (pConn->connType != RPC_CONN_TCPS) + pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); // notify the server app (*(pRpc->cfp))(&rpcMsg, NULL); @@ -1187,7 +1194,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pConn->pContext = pContext; rpcSendMsgToPeer(pConn, msg, msgLen); - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); + if (pConn->connType != RPC_CONN_TCPC) + taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcUnlockConn(pConn); } From 5f979568e113fd83d4e3c28176d2985d7ae28e56 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Wed, 24 Jun 2020 21:59:13 -0700 Subject: [PATCH 08/11] Refactored and cleaned up crash_gen tool, ready to add sub-state transitions --- tests/pytest/crash_gen.py | 143 ++++++++++++++------------------------ 1 file changed, 52 insertions(+), 91 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index e933a865d0..bff2403bb8 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -69,7 +69,7 @@ class WorkerThread: # self._curStep = -1 self._pool = pool self._tid = tid - self._tc = tc + self._tc = tc # type: ThreadCoordinator # self.threadIdent = threading.get_ident() self._thread = threading.Thread(target=runThread, args=(self,)) self._stepGate = threading.Event() @@ -161,13 +161,13 @@ class WorkerThread: if ( gConfig.per_thread_db_connection ): return self._dbConn.execute(sql) else: - return self._tc.getDbState().getDbConn().execute(sql) + return self._tc.getDbManager().getDbConn().execute(sql) def getDbConn(self): if ( gConfig.per_thread_db_connection ): return self._dbConn else: - return self._tc.getDbState().getDbConn() + return self._tc.getDbManager().getDbConn() # def querySql(self, sql): # not "execute", since we are out side the DB context # if ( gConfig.per_thread_db_connection ): @@ -176,12 +176,12 @@ class WorkerThread: # return self._tc.getDbState().getDbConn().query(sql) class ThreadCoordinator: - def __init__(self, pool, dbState): + def __init__(self, pool, dbManager): self._curStep = -1 # first step is 0 self._pool = pool # self._wd = wd self._te = None # prepare for every new step - self._dbState = dbState + self._dbManager = dbManager self._executedTasks: List[Task] = [] # in a given step self._lock = threading.RLock() # sync access for a few things @@ -191,8 +191,8 @@ class ThreadCoordinator: def getTaskExecutor(self): return self._te - def getDbState(self) -> DbState : - return self._dbState + def getDbManager(self) -> DbManager : + return self._dbManager def crossStepBarrier(self): self._stepBarrier.wait() @@ -216,7 +216,7 @@ class ThreadCoordinator: # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" try: - self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + self._dbManager.transition(self._executedTasks) # at end of step, transiton the DB state except taos.error.ProgrammingError as err: if ( err.msg == 'network unavailable' ): # broken DB connection logger.info("DB connection broken, execution failed") @@ -289,8 +289,8 @@ class ThreadCoordinator: # logger.debug(" (dice:{}/{}) ".format(i, nTasks)) # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. # return tasks[i].clone() # TODO: still necessary? - taskType = self.getDbState().pickTaskType() # pick a task type for current state - return taskType(self.getDbState(), self._execStats) # create a task from it + taskType = self.getDbManager().pickTaskType() # pick a task type for current state + return taskType(self.getDbManager(), self._execStats) # create a task from it def resetExecutedTasks(self): self._executedTasks = [] # should be under single thread @@ -301,16 +301,12 @@ class ThreadCoordinator: # We define a class to run a number of threads in locking steps. class ThreadPool: - def __init__(self, dbState, numThreads, maxSteps, funcSequencer): + def __init__(self, numThreads, maxSteps): self.numThreads = numThreads self.maxSteps = maxSteps - self.funcSequencer = funcSequencer # Internal class variables - # self.dispatcher = WorkDispatcher(dbState) # Obsolete? self.curStep = 0 self.threadList = [] - # self.stepGate = threading.Condition() # Gate to hold/sync all threads - # self.numWaitingThreads = 0 # starting to run all the threads, in locking steps def createAndStartThreads(self, tc: ThreadCoordinator): @@ -324,7 +320,8 @@ class ThreadPool: logger.debug("Joining thread...") workerThread._thread.join() -# A queue of continguous POSITIVE integers +# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers +# for new table names class LinearQueue(): def __init__(self): self.firstIndex = 1 # 1st ever element @@ -600,9 +597,9 @@ class StateEmpty(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( self.hasSuccess(tasks, CreateDbTask) ): # at EMPTY, if there's succes in creating DB - if ( not self.hasTask(tasks, DropDbTask) ) : # and no drop_db tasks - self.assertAtMostOneSuccess(tasks, CreateDbTask) # we must have at most one. TODO: compare numbers + if ( self.hasSuccess(tasks, TaskCreateDb) ): # at EMPTY, if there's succes in creating DB + if ( not self.hasTask(tasks, TaskDropDb) ) : # and no drop_db tasks + self.assertAtMostOneSuccess(tasks, TaskCreateDb) # we must have at most one. TODO: compare numbers class StateDbOnly(AnyState): def getInfo(self): @@ -614,19 +611,19 @@ class StateDbOnly(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( not self.hasTask(tasks, CreateDbTask) ): - self.assertAtMostOneSuccess(tasks, DropDbTask) # only if we don't create any more - self.assertIfExistThenSuccess(tasks, DropDbTask) + if ( not self.hasTask(tasks, TaskCreateDb) ): + self.assertAtMostOneSuccess(tasks, TaskDropDb) # only if we don't create any more + self.assertIfExistThenSuccess(tasks, TaskDropDb) # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases # Nothing to be said about adding data task # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess # self.assertAtMostOneSuccess(tasks, DropDbTask) # self._state = self.STATE_EMPTY - if ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success + if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - if ( not self.hasTask(tasks, DropFixedSuperTableTask) ): - self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful, if we don't drop anything + if ( not self.hasTask(tasks, TaskDropSuperTable) ): + self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything # self.assertNoTask(tasks, DropDbTask) # should have have tried # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # # can't say there's add-data attempts, since they may all fail @@ -650,8 +647,8 @@ class StateSuperTableOnly(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( self.hasSuccess(tasks, DropFixedSuperTableTask) ): # we are able to drop the table - self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) + if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table + self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) # self._state = self.STATE_DB_ONLY # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases @@ -675,28 +672,28 @@ class StateHasData(AnyState): def verifyTasksToState(self, tasks, newState): if ( newState.equals(AnyState.STATE_EMPTY) ): - self.hasSuccess(tasks, DropDbTask) - if ( not self.hasTask(tasks, CreateDbTask) ) : - self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy + self.hasSuccess(tasks, TaskDropDb) + if ( not self.hasTask(tasks, TaskCreateDb) ) : + self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only - if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task - self.assertNoTask(tasks, DropDbTask) # we must have drop_db task - self.hasSuccess(tasks, DropFixedSuperTableTask) + if ( not self.hasTask(tasks, TaskCreateDb)): # without a create_db task + self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task + self.hasSuccess(tasks, TaskDropSuperTable) # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted - self.assertNoTask(tasks, DropDbTask) - self.assertNoTask(tasks, DropFixedSuperTableTask) - self.assertNoTask(tasks, AddFixedDataTask) + self.assertNoTask(tasks, TaskDropDb) + self.assertNoTask(tasks, TaskDropSuperTable) + self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) else: # should be STATE_HAS_DATA - self.assertNoTask(tasks, DropDbTask) - if (not self.hasTask(tasks, CreateFixedSuperTableTask)) : # if we didn't create the table - self.assertNoTask(tasks, DropFixedSuperTableTask) # we should not have a task that drops it + self.assertNoTask(tasks, TaskDropDb) + if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table + self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) -# State of the database as we believe it to be -class DbState(): +# Manager of the Database Data/Connection +class DbManager(): def __init__(self, resetDb = True): self.tableNumQueue = LinearQueue() @@ -879,11 +876,11 @@ class DbState(): # Generic Checks, first based on the start state if self._state.canCreateDb(): - self._state.assertIfExistThenSuccess(tasks, CreateDbTask) + self._state.assertIfExistThenSuccess(tasks, TaskCreateDb) # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops if self._state.canDropDb(): - self._state.assertIfExistThenSuccess(tasks, DropDbTask) + self._state.assertIfExistThenSuccess(tasks, TaskDropDb) # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop # if self._state.canCreateFixedTable(): @@ -930,8 +927,8 @@ class Task(): # logger.debug("Allocating taskSN: {}".format(Task.taskSn)) return Task.taskSn - def __init__(self, dbState: DbState, execStats: ExecutionStats): - self._dbState = dbState + def __init__(self, dbManager: DbManager, execStats: ExecutionStats): + self._dbState = dbManager self._workerThread = None self._err = None self._curStep = None @@ -1075,7 +1072,7 @@ class StateTransitionTask(Task): -class CreateDbTask(StateTransitionTask): +class TaskCreateDb(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1090,7 +1087,7 @@ class CreateDbTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("create database db") -class DropDbTask(StateTransitionTask): +class TaskDropDb(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1106,7 +1103,7 @@ class DropDbTask(StateTransitionTask): wt.execSql("drop database db") logger.debug("[OPS] database dropped at {}".format(time.time())) -class CreateFixedSuperTableTask(StateTransitionTask): +class TaskCreateSuperTable(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1124,7 +1121,7 @@ class CreateFixedSuperTableTask(StateTransitionTask): # No need to create the regular tables, INSERT will do that automatically -class ReadFixedDataTask(StateTransitionTask): +class TaskReadData(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1151,7 +1148,7 @@ class ReadFixedDataTask(StateTransitionTask): # tdSql.query(" cars where tbname in ('carzero', 'carone')") -class DropFixedSuperTableTask(StateTransitionTask): +class TaskDropSuperTable(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1167,7 +1164,7 @@ class DropFixedSuperTableTask(StateTransitionTask): tblName = self._dbState.getFixedSuperTableName() wt.execSql("drop table db.{}".format(tblName)) -class AddFixedDataTask(StateTransitionTask): +class TaskAddData(StateTransitionTask): activeTable : Set[int] = set() # Track which table is being actively worked on LARGE_NUMBER_OF_TABLES = 35 SMALL_NUMBER_OF_TABLES = 3 @@ -1233,42 +1230,6 @@ class AddFixedDataTask(StateTransitionTask): self.activeTable.discard(i) # not raising an error, unlike remove -#---------- Non State-Transition Related Tasks ----------# - -class CreateTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tIndex = self._dbState.addTable() - self.logDebug("Creating a table {} ...".format(tIndex)) - wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) - self.logDebug("Table {} created.".format(tIndex)) - self._dbState.releaseTable(tIndex) - -class DropTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tableName = self._dbState.getTableNameToDelete() - if ( not tableName ): # May be "False" - self.logInfo("Cannot generate a table to delete, skipping...") - return - self.logInfo("Dropping a table db.{} ...".format(tableName)) - wt.execSql("drop table db.{}".format(tableName)) - - - -class AddDataTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbState - self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) - tIndex = ds.pickAndAllocateTable() - if ( tIndex == None ): - self.logInfo("No table found to add data, skipping...") - return - sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - self.logDebug("[SQL] Executing SQL: {}".format(sql)) - wt.execSql(sql) - ds.releaseTable(tIndex) - self.logDebug("[OPS] Finished adding data") - - # Deterministic random number generator class Dice(): seeded = False # static, uninitialized @@ -1384,12 +1345,12 @@ def main(): # resetDb = False # DEBUG only # dbState = DbState(resetDb) # DBEUG only! - dbState = DbState() # Regular function + dbManager = DbManager() # Regular function Dice.seed(0) # initial seeding of dice tc = ThreadCoordinator( - ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), + ThreadPool(gConfig.num_threads, gConfig.max_steps), # WorkDispatcher(dbState), # Obsolete? - dbState + dbManager ) # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix @@ -1437,7 +1398,7 @@ def main(): tc.run() tc.logStats() - dbState.cleanUp() + dbManager.cleanUp() # logger.info("Crash_Gen execution finished") From 2ef4eef248f9b484c5ae86dea3344b1c0655f2af Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 25 Jun 2020 05:51:25 +0000 Subject: [PATCH 09/11] remove memory leak --- src/rpc/src/rpcMain.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 640b03e885..e9ddd89467 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1063,9 +1063,11 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { for (int i=0; iipSet.numOfIps; ++i) pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]); rpcSendReqToServer(pRpc, pContext); + rpcFreeCont(rpcMsg.pCont); } else if (pHead->code == TSDB_CODE_RPC_NOT_READY) { pContext->code = pHead->code; rpcProcessConnError(pContext, NULL); + rpcFreeCont(rpcMsg.pCont); } else { rpcNotifyClient(pContext, &rpcMsg); } From 7c30fd22e158d34ce8b655272b2b83fa0a51cfc6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Jun 2020 15:41:40 +0800 Subject: [PATCH 10/11] [td-719] fix bugs in affected rows --- src/client/src/tscParseInsert.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index df4ccca9bc..1db4108d22 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1351,6 +1351,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) { int32_t code = TSDB_CODE_SUCCESS; SSqlCmd *pCmd = &pSql->cmd; + pSql->res.numOfRows = 0; assert(pCmd->numOfClause == 1); STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta; @@ -1394,6 +1395,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { fclose(fp); pParentSql->res.code = code; + tscQueueAsyncRes(pParentSql); return; } @@ -1458,8 +1460,11 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { free(line); if (count > 0) { - if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) { + code = doPackSendDataBlock(pSql, count, pTableDataBlock); + if (code != TSDB_CODE_SUCCESS) { pParentSql->res.code = code; + tscQueueAsyncRes(pParentSql); + return; } } else { From b1859bc19393389815813c75474ce62d796e269c Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 25 Jun 2020 08:36:22 +0000 Subject: [PATCH 11/11] uninitialized variable cleanup timer in last step --- src/dnode/src/dnodeMgmt.c | 2 +- src/mnode/src/mnodeMain.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index d971e3ad6d..7cc4bbcefa 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -721,7 +721,7 @@ int32_t dnodeGetDnodeId() { } void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { - SRpcConnInfo connInfo; + SRpcConnInfo connInfo = {0}; rpcGetConnInfo(rpcMsg->handle, &connInfo); SRpcIpSet ipSet = {0}; diff --git a/src/mnode/src/mnodeMain.c b/src/mnode/src/mnodeMain.c index 96dc700783..5e44ea5731 100644 --- a/src/mnode/src/mnodeMain.c +++ b/src/mnode/src/mnodeMain.c @@ -121,8 +121,8 @@ void mnodeCleanupSystem() { dnodeFreeMnodeWqueue(); dnodeFreeMnodeRqueue(); dnodeFreeMnodePqueue(); - mnodeCleanupTimer(); mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1); + mnodeCleanupTimer(); mPrint("mnode is cleaned up"); }