From 3b2b281045543555f9f637138d797415d87a919b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 28 Jun 2020 09:23:23 +0800 Subject: [PATCH 1/3] [TD-771] refCount while process sdbqueue --- src/mnode/src/mnodeSdb.c | 35 ++++++++++++++++++++++++++--------- src/mnode/src/mnodeTable.c | 2 +- tests/script/loop.sh | 9 +++++++-- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index a9d450aff0..a6d373de99 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -551,16 +551,20 @@ static int sdbWrite(void *param, void *data, int type) { // from app, oper is created if (pOper != NULL) { - sdbTrace("record from app is disposed, version:%" PRIu64 " result:%s", pHead->version, tstrerror(code)); + sdbTrace("record from app is disposed, table:%s action:%s record:%s version:%" PRIu64 " result:%s", + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, + tstrerror(code)); return code; } - + // from wal or forward msg, oper not created, should add into hash if (tsSdbObj.sync != NULL) { - sdbTrace("record from wal forward is disposed, version:%" PRIu64 " confirm it", pHead->version); + sdbTrace("record from wal forward is disposed, table:%s action:%s record:%s version:%" PRIu64 " confirm it", + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); syncConfirmForward(tsSdbObj.sync, pHead->version, code); } else { - sdbTrace("record from wal restore is disposed, version:%" PRIu64 , pHead->version); + sdbTrace("record from wal restore is disposed, table:%s action:%s record:%s version:%" PRIu64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); } if (action == SDB_ACTION_INSERT) { @@ -626,9 +630,11 @@ int32_t sdbInsertRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbTrace("app:%p:%p, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg); + sdbTrace("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } + sdbIncRef(pNewOper->table, pNewOper->pObj); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); return TSDB_CODE_SUCCESS; } @@ -674,9 +680,11 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbTrace("app:%p:%p, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg); + sdbTrace("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } + sdbIncRef(pNewOper->table, pNewOper->pObj); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); return TSDB_CODE_SUCCESS; } @@ -722,9 +730,11 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbTrace("app:%p:%p, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg); + sdbTrace("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } + sdbIncRef(pNewOper->table, pNewOper->pObj); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); return TSDB_CODE_SUCCESS; } @@ -943,7 +953,9 @@ static void *sdbWorkerFp(void *param) { } if (pOper != NULL && pOper->pMsg != NULL) { - sdbTrace("app:%p:%p, will be processed in sdb queue", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg); + sdbTrace("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue", + pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj, + sdbGetKeyStr(pOper->table, pOper->pObj), pHead->version); } int32_t code = sdbWrite(pOper, pHead, type); @@ -959,15 +971,20 @@ static void *sdbWorkerFp(void *param) { if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; if (pOper != NULL && pOper->cb != NULL) { + sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i); pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); } - + if (pOper != NULL && pOper->pMsg != NULL) { sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, tstrerror(pOper->retCode)); } dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode); + + if (pOper != NULL) { + sdbDecRef(pOper->table, pOper->pObj); + } } taosFreeQitem(item); } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 4906aeaeb0..da998a6578 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -1420,7 +1420,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { continue; } if (pTable->vgHash == NULL) { - mError("app:%p:%p, stable:%s, not vgroup exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg, + mError("app:%p:%p, stable:%s, no vgroup exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg, stableName); mnodeDecTableRef(pTable); diff --git a/tests/script/loop.sh b/tests/script/loop.sh index 3e6306d624..b435eef123 100755 --- a/tests/script/loop.sh +++ b/tests/script/loop.sh @@ -11,8 +11,9 @@ set -e CMD_NAME= LOOP_TIMES=5 +SLEEP_TIME=0 -while getopts "f:t:" arg +while getopts "f:t:s:" arg do case $arg in f) @@ -21,6 +22,9 @@ do t) LOOP_TIMES=$OPTARG ;; + s) + SLEEP_TIME=$OPTARG + ;; ?) echo "unknow argument" ;; @@ -29,6 +33,7 @@ done echo LOOP_TIMES ${LOOP_TIMES} echo CMD_NAME ${CMD_NAME} +echo SLEEP_TIME ${SLEEP_TIME} GREEN='\033[1;32m' GREEN_DARK='\033[0;32m' @@ -40,5 +45,5 @@ do echo -e $GREEN loop $i $NC echo -e $GREEN cmd $CMD_NAME $NC $CMD_NAME - sleep 2 + sleep ${SLEEP_TIME} done From abb55ed0744dfe7a8cb7575af07b756e463bfa74 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 28 Jun 2020 02:55:11 +0000 Subject: [PATCH 2/3] revert crash_gen.py --- tests/pytest/crash_gen.py | 137 +++++++++++++++++--------------------- 1 file changed, 62 insertions(+), 75 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 916e8904ff..cc41fd5e7d 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -686,8 +686,7 @@ class StateHasData(AnyState): self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) else: # should be STATE_HAS_DATA - if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one - self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it + self.assertNoTask(tasks, TaskDropDb) if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) @@ -1296,67 +1295,7 @@ class LoggingFilter(logging.Filter): # return False return True -class MainExec: - @classmethod - def runClient(cls): - # resetDb = False # DEBUG only - # dbState = DbState(resetDb) # DBEUG only! - dbManager = DbManager() # Regular function - Dice.seed(0) # initial seeding of dice - thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) - tc = ThreadCoordinator(thPool, dbManager) - tc.run() - tc.logStats() - dbManager.cleanUp() - - @classmethod - def runService(cls): - print("Running service...") - - @classmethod - def runTemp(cls): # for debugging purposes - # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix - # dbc = dbState.getDbConn() - # sTbName = dbState.getFixedSuperTableName() - # dbc.execute("create database if not exists db") - # if not dbState.getState().equals(StateEmpty()): - # dbc.execute("use db") - - # rTables = None - # try: # the super table may not exist - # sql = "select TBNAME from db.{}".format(sTbName) - # logger.info("Finding out tables in super table: {}".format(sql)) - # dbc.query(sql) # TODO: analyze result set later - # logger.info("Fetching result") - # rTables = dbc.getQueryResult() - # logger.info("Result: {}".format(rTables)) - # except taos.error.ProgrammingError as err: - # logger.info("Initial Super table OPS error: {}".format(err)) - - # # sys.exit() - # if ( not rTables == None): - # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - # try: - # for rTbName in rTables : # regular tables - # ds = dbState - # logger.info("Inserting into table: {}".format(rTbName[0])) - # sql = "insert into db.{} values ('{}', {});".format( - # rTbName[0], - # ds.getNextTick(), ds.getNextInt()) - # dbc.execute(sql) - # for rTbName in rTables : # regular tables - # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure - # logger.info("Initial READING operation is successful") - # except taos.error.ProgrammingError as err: - # logger.info("Initial WRITE/READ error: {}".format(err)) - - # Sandbox testing code - # dbc = dbState.getDbConn() - # while True: - # rows = dbc.query("show databases") - # print("Rows: {}, time={}".format(rows, time.time())) - return def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html @@ -1369,27 +1308,24 @@ def main(): 2. You run the server there before this script: ./build/bin/taosd -c test/cfg ''')) - parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') - parser.add_argument('-e', '--run-tdengine', action='store_true', - help='Run TDengine service in foreground (default: false)') parser.add_argument('-l', '--larger-data', action='store_true', help='Write larger amount of data during write operations (default: false)') - parser.add_argument('-p', '--per-thread-db-connection', action='store_false', + parser.add_argument('-p', '--per-thread-db-connection', action='store_true', help='Use a single shared db connection (default: false)') parser.add_argument('-r', '--record-ops', action='store_true', help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') - parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, + parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, help='Maximum number of steps to run (default: 100)') - parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, + parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, help='Number of threads to run (default: 10)') global gConfig gConfig = parser.parse_args() - # if len(sys.argv) == 1: - # parser.print_help() - # sys.exit() + if len(sys.argv) == 1: + parser.print_help() + sys.exit() global logger logger = logging.getLogger('CrashGen') @@ -1401,11 +1337,62 @@ def main(): ch = logging.StreamHandler() logger.addHandler(ch) - if gConfig.run_tdengine : # run server - MainExec.runService() - else : - MainExec.runClient() + # resetDb = False # DEBUG only + # dbState = DbState(resetDb) # DBEUG only! + dbManager = DbManager() # Regular function + Dice.seed(0) # initial seeding of dice + tc = ThreadCoordinator( + ThreadPool(gConfig.num_threads, gConfig.max_steps), + # WorkDispatcher(dbState), # Obsolete? + dbManager + ) + # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix + # dbc = dbState.getDbConn() + # sTbName = dbState.getFixedSuperTableName() + # dbc.execute("create database if not exists db") + # if not dbState.getState().equals(StateEmpty()): + # dbc.execute("use db") + + # rTables = None + # try: # the super table may not exist + # sql = "select TBNAME from db.{}".format(sTbName) + # logger.info("Finding out tables in super table: {}".format(sql)) + # dbc.query(sql) # TODO: analyze result set later + # logger.info("Fetching result") + # rTables = dbc.getQueryResult() + # logger.info("Result: {}".format(rTables)) + # except taos.error.ProgrammingError as err: + # logger.info("Initial Super table OPS error: {}".format(err)) + + # # sys.exit() + # if ( not rTables == None): + # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + # try: + # for rTbName in rTables : # regular tables + # ds = dbState + # logger.info("Inserting into table: {}".format(rTbName[0])) + # sql = "insert into db.{} values ('{}', {});".format( + # rTbName[0], + # ds.getNextTick(), ds.getNextInt()) + # dbc.execute(sql) + # for rTbName in rTables : # regular tables + # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + # logger.info("Initial READING operation is successful") + # except taos.error.ProgrammingError as err: + # logger.info("Initial WRITE/READ error: {}".format(err)) + + + + # Sandbox testing code + # dbc = dbState.getDbConn() + # while True: + # rows = dbc.query("show databases") + # print("Rows: {}, time={}".format(rows, time.time())) + + tc.run() + tc.logStats() + dbManager.cleanUp() # logger.info("Crash_Gen execution finished") From b251dadab4e2225900f1d576e710eca380782d0f Mon Sep 17 00:00:00 2001 From: Steven Li Date: Sat, 27 Jun 2020 21:23:53 -0700 Subject: [PATCH 3/3] Revert "revert crash_gen.py" This reverts commit abb55ed0744dfe7a8cb7575af07b756e463bfa74. --- tests/pytest/crash_gen.py | 137 +++++++++++++++++++++----------------- 1 file changed, 75 insertions(+), 62 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index cc41fd5e7d..916e8904ff 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -686,7 +686,8 @@ class StateHasData(AnyState): self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) else: # should be STATE_HAS_DATA - self.assertNoTask(tasks, TaskDropDb) + if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one + self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) @@ -1295,7 +1296,67 @@ class LoggingFilter(logging.Filter): # return False return True +class MainExec: + @classmethod + def runClient(cls): + # resetDb = False # DEBUG only + # dbState = DbState(resetDb) # DBEUG only! + dbManager = DbManager() # Regular function + Dice.seed(0) # initial seeding of dice + thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) + tc = ThreadCoordinator(thPool, dbManager) + tc.run() + tc.logStats() + dbManager.cleanUp() + + @classmethod + def runService(cls): + print("Running service...") + + @classmethod + def runTemp(cls): # for debugging purposes + # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix + # dbc = dbState.getDbConn() + # sTbName = dbState.getFixedSuperTableName() + # dbc.execute("create database if not exists db") + # if not dbState.getState().equals(StateEmpty()): + # dbc.execute("use db") + + # rTables = None + # try: # the super table may not exist + # sql = "select TBNAME from db.{}".format(sTbName) + # logger.info("Finding out tables in super table: {}".format(sql)) + # dbc.query(sql) # TODO: analyze result set later + # logger.info("Fetching result") + # rTables = dbc.getQueryResult() + # logger.info("Result: {}".format(rTables)) + # except taos.error.ProgrammingError as err: + # logger.info("Initial Super table OPS error: {}".format(err)) + + # # sys.exit() + # if ( not rTables == None): + # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + # try: + # for rTbName in rTables : # regular tables + # ds = dbState + # logger.info("Inserting into table: {}".format(rTbName[0])) + # sql = "insert into db.{} values ('{}', {});".format( + # rTbName[0], + # ds.getNextTick(), ds.getNextInt()) + # dbc.execute(sql) + # for rTbName in rTables : # regular tables + # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + # logger.info("Initial READING operation is successful") + # except taos.error.ProgrammingError as err: + # logger.info("Initial WRITE/READ error: {}".format(err)) + + # Sandbox testing code + # dbc = dbState.getDbConn() + # while True: + # rows = dbc.query("show databases") + # print("Rows: {}, time={}".format(rows, time.time())) + return def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html @@ -1308,24 +1369,27 @@ def main(): 2. You run the server there before this script: ./build/bin/taosd -c test/cfg ''')) + parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument('-e', '--run-tdengine', action='store_true', + help='Run TDengine service in foreground (default: false)') parser.add_argument('-l', '--larger-data', action='store_true', help='Write larger amount of data during write operations (default: false)') - parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + parser.add_argument('-p', '--per-thread-db-connection', action='store_false', help='Use a single shared db connection (default: false)') parser.add_argument('-r', '--record-ops', action='store_true', help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') - parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, + parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, help='Maximum number of steps to run (default: 100)') - parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, + parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, help='Number of threads to run (default: 10)') global gConfig gConfig = parser.parse_args() - if len(sys.argv) == 1: - parser.print_help() - sys.exit() + # if len(sys.argv) == 1: + # parser.print_help() + # sys.exit() global logger logger = logging.getLogger('CrashGen') @@ -1337,62 +1401,11 @@ def main(): ch = logging.StreamHandler() logger.addHandler(ch) - # resetDb = False # DEBUG only - # dbState = DbState(resetDb) # DBEUG only! - dbManager = DbManager() # Regular function - Dice.seed(0) # initial seeding of dice - tc = ThreadCoordinator( - ThreadPool(gConfig.num_threads, gConfig.max_steps), - # WorkDispatcher(dbState), # Obsolete? - dbManager - ) + if gConfig.run_tdengine : # run server + MainExec.runService() + else : + MainExec.runClient() - # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix - # dbc = dbState.getDbConn() - # sTbName = dbState.getFixedSuperTableName() - # dbc.execute("create database if not exists db") - # if not dbState.getState().equals(StateEmpty()): - # dbc.execute("use db") - - # rTables = None - # try: # the super table may not exist - # sql = "select TBNAME from db.{}".format(sTbName) - # logger.info("Finding out tables in super table: {}".format(sql)) - # dbc.query(sql) # TODO: analyze result set later - # logger.info("Fetching result") - # rTables = dbc.getQueryResult() - # logger.info("Result: {}".format(rTables)) - # except taos.error.ProgrammingError as err: - # logger.info("Initial Super table OPS error: {}".format(err)) - - # # sys.exit() - # if ( not rTables == None): - # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - # try: - # for rTbName in rTables : # regular tables - # ds = dbState - # logger.info("Inserting into table: {}".format(rTbName[0])) - # sql = "insert into db.{} values ('{}', {});".format( - # rTbName[0], - # ds.getNextTick(), ds.getNextInt()) - # dbc.execute(sql) - # for rTbName in rTables : # regular tables - # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure - # logger.info("Initial READING operation is successful") - # except taos.error.ProgrammingError as err: - # logger.info("Initial WRITE/READ error: {}".format(err)) - - - - # Sandbox testing code - # dbc = dbState.getDbConn() - # while True: - # rows = dbc.query("show databases") - # print("Rows: {}, time={}".format(rows, time.time())) - - tc.run() - tc.logStats() - dbManager.cleanUp() # logger.info("Crash_Gen execution finished")