From 26114896065b26b0e243d1ed5c9b9b81c5b13c72 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Tue, 14 Jun 2022 19:35:28 +0800 Subject: [PATCH 1/5] test:add test case for tmq --- tests/system-test/7-tmq/tmqError.py | 315 ++++++++++++++++++++++++++++ tests/system-test/fulltest.sh | 1 + 2 files changed, 316 insertions(+) create mode 100644 tests/system-test/7-tmq/tmqError.py diff --git a/tests/system-test/7-tmq/tmqError.py b/tests/system-test/7-tmq/tmqError.py new file mode 100644 index 0000000000..907d69bb7b --- /dev/null +++ b/tests/system-test/7-tmq/tmqError.py @@ -0,0 +1,315 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +class actionType(Enum): + CREATE_DATABASE = 0 + CREATE_STABLE = 1 + CREATE_CTABLE = 2 + INSERT_DATA = 3 + +class TDTestCase: + hostname = socket.gethostname() + #rpcDebugFlagVal = '143' + #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #print ("===================: ", updatecfgDict) + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def initConsumerTable(self,cdbName='cdb'): + tdLog.info("create consume database, and consume info table, and consume result table") + tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) + # tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + # tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + def initConsumerInfoTable(self,cdbName='cdb'): + tdLog.info("drop consumeinfo table") + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) + tdLog.info("consume info sql: %s"%sql) + tdSql.query(sql) + + def selectConsumeResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 3)) + + return resultList + + def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + + tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica)) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, dbName,stbName): + tsql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(dbName, stbName)) + tdLog.debug("complete to create %s.%s" %(dbName, stbName)) + return + + def create_ctables(self,tsql, dbName,stbName,ctbNum): + tsql.execute("use %s" %dbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + if startTs == 0: + t = time.time() + startTs = int(round(t * 1000)) + + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + rowsOfSql = 0 + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + rowsOfSql += 1 + if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + rowsOfSql = 0 + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareEnv(self, **parameterDict): + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + + if parameterDict["actionType"] == actionType.CREATE_DATABASE: + self.create_database(tsql, parameterDict["dbName"]) + elif parameterDict["actionType"] == actionType.CREATE_STABLE: + self.create_stable(tsql, parameterDict["dbName"], parameterDict["stbName"]) + elif parameterDict["actionType"] == actionType.CREATE_CTABLE: + self.create_ctables(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) + elif parameterDict["actionType"] == actionType.INSERT_DATA: + self.insert_data(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],parameterDict["batchNum"]) + else: + tdLog.exit("not support's action: ", parameterDict["actionType"]) + + return + + def tmqCase1(self, cfgPath, buildPath): + ''' + Leave a TMQ process. Stop taosd, delete the data directory, restart taosd, + and restart a consumption process to complete a consumption + ''' + tdLog.printNoPrefix("======== test case 1: ") + + self.initConsumerTable() + + # create and start thread + parameterDict = {'cfg': '', \ + 'actionType': 0, \ + 'dbName': 'db3', \ + 'dropFlag': 1, \ + 'vgroups': 4, \ + 'replica': 1, \ + 'stbName': 'stb1', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 20000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.create_database(tdSql, parameterDict["dbName"]) + self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) + self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) + self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"]) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) + consumerId = 0 + # expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + expectrowcnt = 90000000000 + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + pollDelay = 9000000 # Forever loop + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + time.sleep(3) + tdLog.info("================= stop dnode, and remove data file, then start dnode ===========================") + tdDnodes.stop(1) + # time.sleep(5) + dataPath = buildPath + "/../sim/dnode1/data/*" + shellCmd = 'rm -rf ' + dataPath + tdLog.info(shellCmd) + os.system(shellCmd) + tdDnodes.start(1) + time.sleep(2) + + ######### redo to consume + self.initConsumerTable() + + self.create_database(tdSql, parameterDict["dbName"]) + self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) + self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) + self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"]) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + pollDelay = 20 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + if not (totalConsumeRows == expectrowcnt): + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicFromStb1) + os.system('pkill tmq_sim') + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + tdSql.prepare() + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + self.tmqCase1(cfgPath, buildPath) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index e04db9dae9..38c3741cda 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -112,3 +112,4 @@ python3 ./test.py -f 7-tmq/subscribeStb2.py python3 ./test.py -f 7-tmq/subscribeStb3.py python3 ./test.py -f 7-tmq/subscribeStb4.py python3 ./test.py -f 7-tmq/db.py +python3 ./test.py -f 7-tmq/tmqError.py From f917bd3a61814a15436a80fd27e89f7e09eee838 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 20:40:27 +0800 Subject: [PATCH 2/5] feat: redistribute vgroup --- source/dnode/vnode/src/vnd/vnodeSync.c | 34 ++++++++++++++++++- ...distribute_vgroup_replica3_v1_follower.sim | 21 ++---------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index be27824570..cfa792508f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -66,7 +66,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort); } - return syncReconfig(pVnode->sync, &cfg); + return syncReconfigBuild(pVnode->sync, &cfg, pMsg); } void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { @@ -241,6 +241,30 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } +int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { + return 0; +} + +int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { + return 0; +} + +int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { + return 0; +} + +int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { + return 0; +} + +int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { + return 0; +} + +int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { + return 0; +} + static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pVnode; @@ -250,6 +274,14 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; pFsm->FpRestoreFinishCb = NULL; pFsm->FpReConfigCb = vnodeSyncReconfig; + + pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; + pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead; + pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead; + pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite; + pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite; + pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite; + return pFsm; } diff --git a/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim b/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim index 48c1b7f2d4..73be4a35bf 100644 --- a/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim +++ b/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim @@ -163,26 +163,22 @@ endi print =============== step32: move follower2 print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5 sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5 -<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim -======= sql show d1.vgroups print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 ->>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim sql show d1.tables if $rows != 1 then return -1 endi +return + print =============== step33: move follower1 print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5 sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5 -<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim -======= sql show d1.vgroups print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 ->>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim sql show d1.tables if $rows != 1 then return -1 @@ -191,12 +187,9 @@ endi print =============== step34: move follower2 print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2 sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2 -<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim -======= sql show d1.vgroups print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 ->>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim sql show d1.tables if $rows != 1 then return -1 @@ -205,15 +198,8 @@ endi print =============== step35: move follower1 print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1 sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1 -<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim -sql show d1.tables -if $rows != 1 then - return -1 -endi -======= sql show d1.vgroups print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 ->>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim sql show d1.tables if $rows != 1 then @@ -242,8 +228,6 @@ if $rows != 1 then return -1 endi -<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim -======= print =============== step38: move follower2 print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2 sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2 @@ -254,7 +238,6 @@ sql show d1.tables if $rows != 1 then return -1 endi ->>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim print =============== step39: move follower1 print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1 From 548397db16c433042a19690321632f8d3f4225ac Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 20:58:59 +0800 Subject: [PATCH 3/5] feat: redistribute vgroup --- source/dnode/mnode/impl/src/mndVgroup.c | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 804b4ef5d1..cd1d930846 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -940,7 +940,7 @@ static int32_t mndAddSetVnodeStandByAction(SMnode *pMnode, STrans *pTrans, SDbOb action.pCont = pReq; action.contLen = contLen; - action.msgType = TDMT_DND_DROP_VNODE; + action.msgType = TDMT_SYNC_SET_VNODE_STANDBY; action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED; if (isRedo) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index cfa792508f..17c2c186be 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -66,7 +66,13 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort); } - return syncReconfigBuild(pVnode->sync, &cfg, pMsg); + SRpcMsg rpcMsg = {.info = pMsg->info}; + if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) { + vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr()); + return -1; + } + + return syncPropose(pVnode->sync, &rpcMsg, false); } void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { From cba62bebbae802bf2faafe2ccaaaf9be3cc6bd36 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 15 Jun 2022 09:35:44 +0800 Subject: [PATCH 4/5] feat: make create db retry --- source/dnode/mnode/impl/src/mndDb.c | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 12 +-- tests/script/jenkins/basic.txt | 2 +- tests/script/tsim/trans/create_db.sim | 123 ++++++++++++++++--------- 4 files changed, 88 insertions(+), 51 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 43263af573..004066e016 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -472,7 +472,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, } int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq); if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 1ec4799419..1631c9825b 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1347,13 +1347,11 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { STransAction *pAction = taosArrayGet(pArray, i); - if (pAction->errCode != 0) { - mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id, - mndTransStr(pAction->stage), i, tstrerror(pAction->errCode)); - pAction->msgSent = 1; - pAction->msgReceived = 1; - pAction->errCode = 0; - } + mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id, + mndTransStr(pAction->stage), i, tstrerror(pAction->errCode)); + pAction->msgSent = 1; + pAction->msgReceived = 1; + pAction->errCode = 0; } mndTransExecute(pMnode, pTrans); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 85bb957dc1..206b0656f9 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -58,7 +58,7 @@ # ---- mnode ./test.sh -f tsim/mnode/basic1.sim ./test.sh -f tsim/mnode/basic2.sim -#./test.sh -f tsim/mnode/basic3.sim +./test.sh -f tsim/mnode/basic3.sim ./test.sh -f tsim/mnode/basic4.sim ./test.sh -f tsim/mnode/basic5.sim diff --git a/tests/script/tsim/trans/create_db.sim b/tests/script/tsim/trans/create_db.sim index e13014f9c0..f730f2fb67 100644 --- a/tests/script/tsim/trans/create_db.sim +++ b/tests/script/tsim/trans/create_db.sim @@ -7,44 +7,28 @@ system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode2 -s start sql connect -print =============== show dnodes -sql show dnodes; -if $rows != 1 then - return -1 -endi - -if $data00 != 1 then - return -1 -endi - -sql show mnodes; -if $rows != 1 then - return -1 -endi - -if $data00 != 1 then - return -1 -endi - -if $data02 != leader then - return -1 -endi - print =============== create dnodes sql create dnode $hostname port 7200 -sleep 2000 -sql show dnodes; -if $rows != 2 then - return -1 -endi - -if $data00 != 1 then +$x = 0 +step1: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +if $rows != 2 then return -1 endi - -if $data10 != 2 then - return -1 +if $data(1)[4] != ready then + goto step1 +endi +if $data(2)[4] != ready then + goto step1 endi print =============== kill dnode2 @@ -68,7 +52,7 @@ if $data[0][0] != 7 then return -1 endi -if $data[0][2] != undoAction then +if $data[0][2] != redoAction then return -1 endi @@ -80,14 +64,34 @@ sql_error create database d1 vgroups 2; print =============== start dnode2 system sh/exec.sh -n dnode2 -s start -sleep 3000 + +$x = 0 +step2: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +if $rows != 2 then + return -1 +endi +if $data(1)[4] != ready then + goto step2 +endi +if $data(2)[4] != ready then + goto step2 +endi sql show transactions if $rows != 0 then return -1 endi -sql create database d1 vgroups 2; +sql_error create database d1 vgroups 2; print =============== kill dnode2 system sh/exec.sh -n dnode2 -s stop -x SIGINT @@ -106,22 +110,31 @@ if $rows != 1 then return -1 endi -if $data[0][0] != 9 then +if $data[0][0] != 8 then return -1 endi -if $data[0][2] != undoAction then +if $data[0][2] != redoAction then return -1 endi if $data[0][3] != d2 then return -1 endi -return + +sql show databases ; +if $rows != 4 then + return -1 +endi +print d2 ==> $data(d2)[19] +if $data(d2)[19] != creating then + return -1 +endi + sql_error create database d2 vgroups 2; print =============== kill transaction -sql kill transaction 9; +sql kill transaction 8; sleep 2000 sql show transactions @@ -131,7 +144,34 @@ endi print =============== start dnode2 system sh/exec.sh -n dnode2 -s start -sleep 3000 + +$x = 0 +step3: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +if $rows != 2 then + return -1 +endi +if $data(1)[4] != ready then + goto step3 +endi +if $data(2)[4] != ready then + goto step3 +endi + +sql show transactions +if $rows != 0 then + return -1 +endi + +sql drop database d2; sql show transactions if $rows != 0 then @@ -145,6 +185,5 @@ sql_error kill transaction 3; sql_error kill transaction 4; sql_error kill transaction 5; -return system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT \ No newline at end of file From 1b2a49231bf0d5096f03d2700028fab5465f5de7 Mon Sep 17 00:00:00 2001 From: tomchon Date: Wed, 15 Jun 2022 11:09:29 +0800 Subject: [PATCH 5/5] test:modify testcase of muti-mnode --- .../6-cluster/5dnode3mnodeStopInsert.py | 61 ++++++++++++------- 1 file changed, 39 insertions(+), 22 deletions(-) diff --git a/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py b/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py index 518de8d6c4..fe569d9b8d 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py +++ b/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py @@ -12,8 +12,8 @@ from util.dnodes import TDDnode import time import socket import subprocess +from multiprocessing import Process import threading as thd - class MyDnodes(TDDnodes): def __init__(self ,dnodes_lists): super(MyDnodes,self).__init__() @@ -30,7 +30,6 @@ class TDTestCase: self.depoly_cluster(dnodenumber) self.master_dnode = self.TDDnodes.dnodes[0] self.host=self.master_dnode.cfgDict["fqdn"] - conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir) tdSql.init(conn1.cursor()) @@ -50,7 +49,7 @@ class TDTestCase: buildPath = root[:len(root) - len("/build/bin")] break return buildPath - + def insert_data(self,countstart,countstop): # fisrt add data : db\stable\childtable\general table for couti in range(countstart,countstop): @@ -139,14 +138,15 @@ class TDTestCase: tdSql.query("show mnodes;") tdSql.checkRows(3) - tdSql.checkData(0,1,'chenhaoran02:6030') + tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,3,'ready') - tdSql.checkData(1,1,'chenhaoran02:6130') + tdSql.checkData(1,1,'%s:6130'%self.host) tdSql.checkData(1,3,'ready') - tdSql.checkData(2,1,'chenhaoran02:6230') + tdSql.checkData(2,1,'%s:6230'%self.host) tdSql.checkData(2,3,'ready') def check3mnode1off(self): + tdSql.error("drop mnode on dnode 1;") count=0 while count < 10: time.sleep(1) @@ -167,17 +167,18 @@ class TDTestCase: tdSql.query("show mnodes;") tdSql.checkRows(3) - tdSql.checkData(0,1,'chenhaoran02:6030') + tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,2,'offline') tdSql.checkData(0,3,'ready') - tdSql.checkData(1,1,'chenhaoran02:6130') + tdSql.checkData(1,1,'%s:6130'%self.host) tdSql.checkData(1,3,'ready') - tdSql.checkData(2,1,'chenhaoran02:6230') + tdSql.checkData(2,1,'%s:6230'%self.host) tdSql.checkData(2,3,'ready') def check3mnode2off(self): + tdSql.error("drop mnode on dnode 2;") count=0 - while count < 10: + while count < 40: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : @@ -192,17 +193,18 @@ class TDTestCase: tdSql.query("show mnodes;") tdSql.checkRows(3) - tdSql.checkData(0,1,'chenhaoran02:6030') + tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,2,'leader') tdSql.checkData(0,3,'ready') - tdSql.checkData(1,1,'chenhaoran02:6130') + tdSql.checkData(1,1,'%s:6130'%self.host) tdSql.checkData(1,2,'offline') tdSql.checkData(1,3,'ready') - tdSql.checkData(2,1,'chenhaoran02:6230') + tdSql.checkData(2,1,'%s:6230'%self.host) tdSql.checkData(2,2,'follower') tdSql.checkData(2,3,'ready') def check3mnode3off(self): + tdSql.error("drop mnode on dnode 3;") count=0 while count < 10: time.sleep(1) @@ -219,13 +221,13 @@ class TDTestCase: tdSql.query("show mnodes;") tdSql.checkRows(3) - tdSql.checkData(0,1,'chenhaoran02:6030') + tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,2,'leader') tdSql.checkData(0,3,'ready') - tdSql.checkData(1,1,'chenhaoran02:6130') + tdSql.checkData(1,1,'%s:6130'%self.host) tdSql.checkData(1,2,'follower') tdSql.checkData(1,3,'ready') - tdSql.checkData(2,1,'chenhaoran02:6230') + tdSql.checkData(2,1,'%s:6230'%self.host) tdSql.checkData(2,2,'offline') tdSql.checkData(2,3,'ready') @@ -233,13 +235,13 @@ class TDTestCase: def five_dnode_three_mnode(self,dnodenumber): tdSql.query("show dnodes;") - tdSql.checkData(0,1,'chenhaoran02:6030') - tdSql.checkData(4,1,'chenhaoran02:6430') + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) tdSql.checkData(0,4,'ready') tdSql.checkData(4,4,'ready') tdSql.query("show mnodes;") tdSql.checkRows(1) - tdSql.checkData(0,1,'chenhaoran02:6030') + tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,2,'leader') tdSql.checkData(0,3,'ready') @@ -255,15 +257,27 @@ class TDTestCase: tdSql.query("show dnodes;") print(tdSql.queryResult) - # stop and follower of mnode + + tdLog.debug("stop and follower of mnode") + self.TDDnodes.stoptaosd(2) + self.check3mnode2off() + self.TDDnodes.starttaosd(2) + + self.TDDnodes.stoptaosd(3) + self.check3mnode3off() + self.TDDnodes.starttaosd(3) + + self.TDDnodes.stoptaosd(1) + self.check3mnode1off() + self.TDDnodes.starttaosd(1) + + # self.check3mnode() stopcount =0 while stopcount <= 2: for i in range(dnodenumber): threads = [] threads.append(thd.Thread(target=self.insert_data, args=(i*2,i*2+2))) - # start_time = time.time() threads[0].start() - # end_time = time.time() self.TDDnodes.stoptaosd(i+1) # if i == 1 : # self.check3mnode2off() @@ -271,12 +285,15 @@ class TDTestCase: # self.check3mnode3off() # elif i == 0: # self.check3mnode1off() + self.TDDnodes.starttaosd(i+1) threads[0].join() + # self.check3mnode() stopcount+=1 self.check3mnode() + def getConnection(self, dnode): host = dnode.cfgDict["fqdn"] port = dnode.cfgDict["serverPort"]