From 3c4663848bc05dfb769f838d0010475e98ea3a1d Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 15 Jun 2022 16:47:03 +0800 Subject: [PATCH 1/5] feat: support compute only use qnode --- include/libs/nodes/plannodes.h | 4 ++- source/libs/nodes/src/nodesCodeFuncs.c | 8 ++++- source/libs/planner/src/planScaleOut.c | 50 +++++++++++++++++++++++++- source/libs/planner/src/planSpliter.c | 19 +++++++--- 4 files changed, 74 insertions(+), 7 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 5d3e0cd142..26d055d7d2 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -177,7 +177,8 @@ typedef enum ESubplanType { SUBPLAN_TYPE_MERGE = 1, SUBPLAN_TYPE_PARTIAL, SUBPLAN_TYPE_SCAN, - SUBPLAN_TYPE_MODIFY + SUBPLAN_TYPE_MODIFY, + SUBPLAN_TYPE_COMPUTE } ESubplanType; typedef struct SSubplanId { @@ -196,6 +197,7 @@ typedef struct SLogicSubplan { SVgroupsInfo* pVgroupList; int32_t level; int32_t splitFlag; + int32_t numOfComputeNodes; } SLogicSubplan; typedef struct SQueryLogicPlan { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index d73434f3f0..968bb75997 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1117,6 +1117,7 @@ static const char* jkLogicSubplanVgroupsSize = "VgroupsSize"; static const char* jkLogicSubplanVgroups = "Vgroups"; static const char* jkLogicSubplanLevel = "Level"; static const char* jkLogicSubplanSplitFlag = "SplitFlag"; +static const char* jkLogicSubplanNumOfComputeNodes = "NumOfComputeNodes"; static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) { const SLogicSubplan* pNode = (const SLogicSubplan*)pObj; @@ -1143,6 +1144,9 @@ static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkLogicSubplanSplitFlag, pNode->splitFlag); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkLogicSubplanNumOfComputeNodes, pNode->numOfComputeNodes); + } return code; } @@ -1159,7 +1163,6 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) { } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType, code); - ; } int32_t objSize = 0; if (TSDB_CODE_SUCCESS == code) { @@ -1174,6 +1177,9 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkLogicSubplanSplitFlag, &pNode->splitFlag); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkLogicSubplanNumOfComputeNodes, &pNode->numOfComputeNodes); + } return code; } diff --git a/source/libs/planner/src/planScaleOut.c b/source/libs/planner/src/planScaleOut.c index 9707b36f4a..a0b63ad6ff 100644 --- a/source/libs/planner/src/planScaleOut.c +++ b/source/libs/planner/src/planScaleOut.c @@ -115,7 +115,45 @@ static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, } } -static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) { +static int32_t scaleOutForCompute(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) { + int32_t code = TSDB_CODE_SUCCESS; + for (int32_t i = 0; i < pSubplan->numOfComputeNodes; ++i) { + SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level); + if (NULL == pNewSubplan) { + return TSDB_CODE_OUT_OF_MEMORY; + } + code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + return code; +} + +static int32_t pushHierarchicalPlanForCompute(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) { + SNode* pChild = NULL; + SNode* pParent = NULL; + int32_t code = TSDB_CODE_SUCCESS; + FORBOTH(pChild, pCurrentGroup, pParent, pParentsGroup) { + code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent); + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + return code; +} + +static bool isComputeGroup(SNodeList* pGroup) { + if (0 == LIST_LENGTH(pGroup)) { + return false; + } + return SUBPLAN_TYPE_COMPUTE == ((SLogicSubplan*)nodesListGetNode(pGroup, 0))->subplanType; +} + +static int32_t pushHierarchicalPlanForNormal(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) { int32_t code = TSDB_CODE_SUCCESS; bool topLevel = (0 == LIST_LENGTH(pParentsGroup)); SNode* pChild = NULL; @@ -138,6 +176,13 @@ static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurren return code; } +static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) { + if (isComputeGroup(pParentsGroup)) { + return pushHierarchicalPlanForCompute(pParentsGroup, pCurrentGroup); + } + return pushHierarchicalPlanForNormal(pParentsGroup, pCurrentGroup); +} + static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) { SNodeList* pCurrentGroup = nodesMakeList(); if (NULL == pCurrentGroup) { @@ -155,6 +200,9 @@ static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32 case SUBPLAN_TYPE_MODIFY: code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup); break; + case SUBPLAN_TYPE_COMPUTE: + code = scaleOutForCompute(pCxt, pSubplan, level, pCurrentGroup); + break; default: break; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 82f6b7fe14..3a9e1d7405 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -994,8 +994,20 @@ static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { } int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType); if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0)); + SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0); + if (NULL != pScanSubplan) { + if (NULL != info.pSubplan->pVgroupList) { + info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups; + TSWAP(pScanSubplan->pVgroupList, info.pSubplan->pVgroupList); + } else { + info.pSubplan->numOfComputeNodes = 1; + } + code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan); + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } } + info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE; ++(pCxt->groupId); pCxt->split = true; return code; @@ -1007,8 +1019,7 @@ static const SSplitRule splitRuleSet[] = { {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit}, {.pName = "UnionAllSplit", .splitFunc = unionAllSplit}, {.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}, - {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, - {.pName = "QnodeSplit", .splitFunc = qnodeSplit} + {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit} }; // clang-format on @@ -1039,7 +1050,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) { } } } while (split); - return TSDB_CODE_SUCCESS; + return qnodeSplit(&cxt, pSubplan); } static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) { From b93d64c35a2d8a6b01190a99398d2e685df7dae2 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Wed, 15 Jun 2022 17:28:15 +0800 Subject: [PATCH 2/5] test: update case for fix TD-16538 --- tests/system-test/7-tmq/tmqError.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/7-tmq/tmqError.py b/tests/system-test/7-tmq/tmqError.py index 907d69bb7b..5b5658d528 100644 --- a/tests/system-test/7-tmq/tmqError.py +++ b/tests/system-test/7-tmq/tmqError.py @@ -59,8 +59,8 @@ class TDTestCase: def initConsumerTable(self,cdbName='cdb'): tdLog.info("create consume database, and consume info table, and consume result table") tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) - # tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) - # tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) From 4ee4266e3c3646f7dc426a54f6edb2e9637c49bf Mon Sep 17 00:00:00 2001 From: tomchon Date: Wed, 15 Jun 2022 18:14:11 +0800 Subject: [PATCH 3/5] test:modify testcase of muti-mnode --- tests/pytest/util/dnodes.py | 32 +-- tests/pytest/util/sql.py | 1 + tests/system-test/6-cluster/5dnode2mnode.py | 6 +- .../system-test/6-cluster/5dnode3mnodeDrop.py | 91 ++++---- .../system-test/6-cluster/5dnode3mnodeStop.py | 91 ++++---- .../6-cluster/5dnode3mnodeStopInsert.py | 201 +++++++++++------- tests/system-test/fulltest.sh | 2 +- 7 files changed, 254 insertions(+), 170 deletions(-) diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 9627282ba5..be3454f78f 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -334,22 +334,22 @@ class TDDnode: bkey2 = bytes(key2, encoding="utf8") logFile = self.logDir + "/taosdlog.0" i = 0 - while not os.path.exists(logFile): - sleep(0.1) - i += 1 - if i > 10: - break - tailCmdStr = 'tail -f ' - if platform.system().lower() == 'windows': - tailCmdStr = 'tail -n +0 -f ' - popen = subprocess.Popen( - tailCmdStr + logFile, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True) - pid = popen.pid - # print('Popen.pid:' + str(pid)) - timeout = time.time() + 60 * 2 + # while not os.path.exists(logFile): + # sleep(0.1) + # i += 1 + # if i > 10: + # break + # tailCmdStr = 'tail -f ' + # if platform.system().lower() == 'windows': + # tailCmdStr = 'tail -n +0 -f ' + # popen = subprocess.Popen( + # tailCmdStr + logFile, + # stdout=subprocess.PIPE, + # stderr=subprocess.PIPE, + # shell=True) + # pid = popen.pid + # # print('Popen.pid:' + str(pid)) + # timeout = time.time() + 60 * 2 # while True: # line = popen.stdout.readline().strip() # print(line) diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index bdda7c453b..456bbf6d8b 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -156,6 +156,7 @@ class TDSql: def checkRows(self, expectRows): if self.queryRows == expectRows: tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows)) + return True else: caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectRows) diff --git a/tests/system-test/6-cluster/5dnode2mnode.py b/tests/system-test/6-cluster/5dnode2mnode.py index 9d9cc9c0d6..f3bb6df02f 100644 --- a/tests/system-test/6-cluster/5dnode2mnode.py +++ b/tests/system-test/6-cluster/5dnode2mnode.py @@ -54,7 +54,9 @@ class TDTestCase: valgrind = 0 hostname = socket.gethostname() dnodes = [] - start_port = 6030 + start_port = 6030 + start_port_sec = 6130 + for num in range(1, dnodes_nums+1): dnode = TDDnode(num) dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}") @@ -62,6 +64,8 @@ class TDTestCase: dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}") dnode.addExtraCfg("monitorFqdn", hostname) dnode.addExtraCfg("monitorPort", 7043) + dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}") + dnodes.append(dnode) self.TDDnodes = MyDnodes(dnodes) diff --git a/tests/system-test/6-cluster/5dnode3mnodeDrop.py b/tests/system-test/6-cluster/5dnode3mnodeDrop.py index fe85f43ea1..5b4901f1db 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeDrop.py +++ b/tests/system-test/6-cluster/5dnode3mnodeDrop.py @@ -78,6 +78,7 @@ class TDTestCase: print(hostname) dnodes = [] start_port = 6030 + start_port_sec = 6130 for num in range(1, dnodes_nums+1): dnode = TDDnode(num) dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}") @@ -85,6 +86,7 @@ class TDTestCase: dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}") dnode.addExtraCfg("monitorFqdn", hostname) dnode.addExtraCfg("monitorPort", 7043) + dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}") dnodes.append(dnode) self.TDDnodes = MyDnodes(dnodes) @@ -116,25 +118,28 @@ class TDTestCase: while count < 10: time.sleep(1) tdSql.query("show mnodes;") - if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='leader' : - if tdSql.queryResult[1][2]=='follower': - if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") - break - elif tdSql.queryResult[0][2]=='follower' : - if tdSql.queryResult[1][2]=='leader': - if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") - break - elif tdSql.queryResult[0][2]=='follower' : - if tdSql.queryResult[1][2]=='follower': - if tdSql.queryResult[2][2]=='leader': - print("three mnodes is ready in 10s") - break + if tdSql.checkRows(3) : + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='leader' : + if tdSql.queryResult[1][2]=='follower': + if tdSql.queryResult[2][2]=='follower': + print("three mnodes is ready in 10s") + break + elif tdSql.queryResult[0][2]=='follower' : + if tdSql.queryResult[1][2]=='leader': + if tdSql.queryResult[2][2]=='follower': + print("three mnodes is ready in 10s") + break + elif tdSql.queryResult[0][2]=='follower' : + if tdSql.queryResult[1][2]=='follower': + if tdSql.queryResult[2][2]=='leader': + print("three mnodes is ready in 10s") + break count+=1 else: + print(tdSql.queryResult) print("three mnodes is not ready in 10s ") + return -1 tdSql.query("show mnodes;") tdSql.checkRows(3) @@ -151,18 +156,21 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='offline' : - if tdSql.queryResult[1][2]=='leader': - if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") - break - elif tdSql.queryResult[1][2]=='follower': - if tdSql.queryResult[2][2]=='leader': - print("stop mnodes on dnode 2 successfully in 10s") - break + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='offline' : + if tdSql.queryResult[1][2]=='leader': + if tdSql.queryResult[2][2]=='follower': + print("stop mnodes on dnode 2 successfully in 10s") + break + elif tdSql.queryResult[1][2]=='follower': + if tdSql.queryResult[2][2]=='leader': + print("stop mnodes on dnode 2 successfully in 10s") + break count+=1 else: print("stop mnodes on dnode 2 failed in 10s ") + return -1 + tdSql.error("drop mnode on dnode 1;") tdSql.query("show mnodes;") tdSql.checkRows(3) @@ -174,20 +182,23 @@ class TDTestCase: tdSql.checkData(2,1,'%s:6230'%self.host) tdSql.checkData(2,3,'ready') - def check3mnode2drop(self): + def check3mnode2off(self): count=0 - while count < 10: + while count < 40: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='leader' : - if tdSql.queryResult[1][2]=='offline': - if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") - break + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='leader' : + if tdSql.queryResult[1][2]=='offline': + if tdSql.queryResult[2][2]=='follower': + print("stop mnodes on dnode 2 successfully in 10s") + break count+=1 else: print("stop mnodes on dnode 2 failed in 10s ") + return -1 + tdSql.error("drop mnode on dnode 2;") tdSql.query("show mnodes;") tdSql.checkRows(3) @@ -197,7 +208,7 @@ class TDTestCase: tdSql.checkData(1,1,'%s:6130'%self.host) tdSql.checkData(1,2,'offline') tdSql.checkData(1,3,'ready') - tdSql.checkData(2,1,'%s:6230') + tdSql.checkData(2,1,'%s:6230'%self.host) tdSql.checkData(2,2,'follower') tdSql.checkData(2,3,'ready') @@ -207,15 +218,17 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='leader' : - if tdSql.queryResult[2][2]=='offline': - if tdSql.queryResult[1][2]=='follower': - print("stop mnodes on dnode 3 successfully in 10s") - break + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='leader' : + if tdSql.queryResult[2][2]=='offline': + if tdSql.queryResult[1][2]=='follower': + print("stop mnodes on dnode 3 successfully in 10s") + break count+=1 else: print("stop mnodes on dnode 3 failed in 10s") - + return -1 + tdSql.error("drop mnode on dnode 3;") tdSql.query("show mnodes;") tdSql.checkRows(3) tdSql.checkData(0,1,'%s:6030'%self.host) diff --git a/tests/system-test/6-cluster/5dnode3mnodeStop.py b/tests/system-test/6-cluster/5dnode3mnodeStop.py index f1fe8e7458..e93d00d850 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeStop.py +++ b/tests/system-test/6-cluster/5dnode3mnodeStop.py @@ -115,25 +115,27 @@ class TDTestCase: while count < 10: time.sleep(1) tdSql.query("show mnodes;") - if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='leader' : - if tdSql.queryResult[1][2]=='follower': - if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") - break - elif tdSql.queryResult[0][2]=='follower' : - if tdSql.queryResult[1][2]=='leader': - if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") - break - elif tdSql.queryResult[0][2]=='follower' : - if tdSql.queryResult[1][2]=='follower': - if tdSql.queryResult[2][2]=='leader': - print("three mnodes is ready in 10s") - break + if tdSql.checkRows(3) : + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='leader' : + if tdSql.queryResult[1][2]=='follower': + if tdSql.queryResult[2][2]=='follower': + print("three mnodes is ready in 10s") + break + elif tdSql.queryResult[0][2]=='follower' : + if tdSql.queryResult[1][2]=='leader': + if tdSql.queryResult[2][2]=='follower': + print("three mnodes is ready in 10s") + break + elif tdSql.queryResult[0][2]=='follower' : + if tdSql.queryResult[1][2]=='follower': + if tdSql.queryResult[2][2]=='leader': + print("three mnodes is ready in 10s") + break count+=1 else: print("three mnodes is not ready in 10s ") + return -1 tdSql.query("show mnodes;") tdSql.checkRows(3) @@ -145,24 +147,26 @@ class TDTestCase: tdSql.checkData(2,3,'ready') def check3mnode1off(self): - tdSql.error("drop mnode on dnode 1;") count=0 while count < 10: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='offline' : - if tdSql.queryResult[1][2]=='leader': - if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") - break - elif tdSql.queryResult[1][2]=='follower': - if tdSql.queryResult[2][2]=='leader': - print("stop mnodes on dnode 2 successfully in 10s") - break + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='offline' : + if tdSql.queryResult[1][2]=='leader': + if tdSql.queryResult[2][2]=='follower': + print("stop mnodes on dnode 2 successfully in 10s") + break + elif tdSql.queryResult[1][2]=='follower': + if tdSql.queryResult[2][2]=='leader': + print("stop mnodes on dnode 2 successfully in 10s") + break count+=1 else: print("stop mnodes on dnode 2 failed in 10s ") + return -1 + tdSql.error("drop mnode on dnode 1;") tdSql.query("show mnodes;") tdSql.checkRows(3) @@ -175,20 +179,22 @@ class TDTestCase: tdSql.checkData(2,3,'ready') def check3mnode2off(self): - tdSql.error("drop mnode on dnode 2;") count=0 - while count < 10: + while count < 40: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='leader' : - if tdSql.queryResult[1][2]=='offline': - if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") - break + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='leader' : + if tdSql.queryResult[1][2]=='offline': + if tdSql.queryResult[2][2]=='follower': + print("stop mnodes on dnode 2 successfully in 10s") + break count+=1 else: print("stop mnodes on dnode 2 failed in 10s ") + return -1 + tdSql.error("drop mnode on dnode 2;") tdSql.query("show mnodes;") tdSql.checkRows(3) @@ -203,21 +209,22 @@ class TDTestCase: tdSql.checkData(2,3,'ready') def check3mnode3off(self): - tdSql.error("drop mnode on dnode 3;") count=0 while count < 10: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='leader' : - if tdSql.queryResult[2][2]=='offline': - if tdSql.queryResult[1][2]=='follower': - print("stop mnodes on dnode 3 successfully in 10s") - break + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='leader' : + if tdSql.queryResult[2][2]=='offline': + if tdSql.queryResult[1][2]=='follower': + print("stop mnodes on dnode 3 successfully in 10s") + break count+=1 else: print("stop mnodes on dnode 3 failed in 10s") - + return -1 + tdSql.error("drop mnode on dnode 3;") tdSql.query("show mnodes;") tdSql.checkRows(3) tdSql.checkData(0,1,'%s:6030'%self.host) @@ -264,13 +271,13 @@ class TDTestCase: self.TDDnodes.stoptaosd(3) self.check3mnode3off() - self.TDDnodes.starttaosd(2) + self.TDDnodes.starttaosd(3) self.TDDnodes.stoptaosd(1) self.check3mnode1off() self.TDDnodes.starttaosd(1) - # self.check3mnode() + self.check3mnode() stopcount =0 while stopcount <= 2: for i in range(dnodenumber): diff --git a/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py b/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py index fe569d9b8d..be86e4b652 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py +++ b/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py @@ -13,13 +13,50 @@ import time import socket import subprocess from multiprocessing import Process -import threading as thd +import threading +import time +import inspect +import ctypes class MyDnodes(TDDnodes): def __init__(self ,dnodes_lists): super(MyDnodes,self).__init__() self.dnodes = dnodes_lists # dnode must be TDDnode instance self.simDeployed = False - + +class MyThreadFunc(object): + ''' + 手动终止线程的方法 + ''' + def __init__(self, func): + self.myThread = threading.Thread(target=func) + + def start(self): + print('线程启动') + self.myThread.start() + + def stop(self): + print('线程终止') + try: + for i in range(5): + self._async_raise(self.myThread.ident, SystemExit) + time.sleep(1) + except Exception as e: + print(e) + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + tid = ctypes.c_long(tid) + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + class TDTestCase: def init(self,conn ,logSql): @@ -52,8 +89,11 @@ class TDTestCase: def insert_data(self,countstart,countstop): # fisrt add data : db\stable\childtable\general table + for couti in range(countstart,countstop): + tdLog.debug("drop database if exists db%d" %couti) tdSql.execute("drop database if exists db%d" %couti) + print("create database if not exists db%d replica 1 days 300" %couti) tdSql.execute("create database if not exists db%d replica 1 days 300" %couti) tdSql.execute("use db%d" %couti) tdSql.execute( @@ -78,6 +118,7 @@ class TDTestCase: hostname = socket.gethostname() dnodes = [] start_port = 6030 + start_port_sec = 6130 for num in range(1, dnodes_nums+1): dnode = TDDnode(num) dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}") @@ -85,6 +126,7 @@ class TDTestCase: dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}") dnode.addExtraCfg("monitorFqdn", hostname) dnode.addExtraCfg("monitorPort", 7043) + dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}") dnodes.append(dnode) self.TDDnodes = MyDnodes(dnodes) @@ -111,30 +153,58 @@ class TDTestCase: time.sleep(2) tdLog.info(" create cluster with %d dnode done! " %dnodes_nums) + def checkdnodes(self,dnodenumber): + count=0 + while count < 10: + time.sleep(1) + statusReadyBumber=0 + tdSql.query("show dnodes;") + if tdSql.checkRows(dnodenumber) : + print("dnode is %d nodes"%dnodenumber) + for i in range(dnodenumber): + if tdSql.queryResult[i][4] !='ready' : + status=tdSql.queryResult[i][4] + print("dnode:%d status is %s "%(i,status)) + break + else: + statusReadyBumber+=1 + print(statusReadyBumber) + if statusReadyBumber == dnodenumber : + print("all of %d mnodes is ready in 10s "%dnodenumber) + return True + break + count+=1 + else: + print("%d mnodes is not ready in 10s "%dnodenumber) + return False + + def check3mnode(self): count=0 while count < 10: time.sleep(1) tdSql.query("show mnodes;") - if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='leader' : - if tdSql.queryResult[1][2]=='follower': - if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") - break - elif tdSql.queryResult[0][2]=='follower' : - if tdSql.queryResult[1][2]=='leader': - if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") - break - elif tdSql.queryResult[0][2]=='follower' : - if tdSql.queryResult[1][2]=='follower': - if tdSql.queryResult[2][2]=='leader': - print("three mnodes is ready in 10s") - break + if tdSql.checkRows(3) : + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='leader' : + if tdSql.queryResult[1][2]=='follower': + if tdSql.queryResult[2][2]=='follower': + print("three mnodes is ready in 10s") + break + elif tdSql.queryResult[0][2]=='follower' : + if tdSql.queryResult[1][2]=='leader': + if tdSql.queryResult[2][2]=='follower': + print("three mnodes is ready in 10s") + break + elif tdSql.queryResult[0][2]=='follower' : + if tdSql.queryResult[1][2]=='follower': + if tdSql.queryResult[2][2]=='leader': + print("three mnodes is ready in 10s") + break count+=1 else: print("three mnodes is not ready in 10s ") + return -1 tdSql.query("show mnodes;") tdSql.checkRows(3) @@ -146,24 +216,26 @@ class TDTestCase: tdSql.checkData(2,3,'ready') def check3mnode1off(self): - tdSql.error("drop mnode on dnode 1;") count=0 while count < 10: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='offline' : - if tdSql.queryResult[1][2]=='leader': - if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") - break - elif tdSql.queryResult[1][2]=='follower': - if tdSql.queryResult[2][2]=='leader': - print("stop mnodes on dnode 2 successfully in 10s") - break + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='offline' : + if tdSql.queryResult[1][2]=='leader': + if tdSql.queryResult[2][2]=='follower': + print("stop mnodes on dnode 2 successfully in 10s") + break + elif tdSql.queryResult[1][2]=='follower': + if tdSql.queryResult[2][2]=='leader': + print("stop mnodes on dnode 2 successfully in 10s") + break count+=1 else: print("stop mnodes on dnode 2 failed in 10s ") + return -1 + tdSql.error("drop mnode on dnode 1;") tdSql.query("show mnodes;") tdSql.checkRows(3) @@ -176,20 +248,22 @@ class TDTestCase: tdSql.checkData(2,3,'ready') def check3mnode2off(self): - tdSql.error("drop mnode on dnode 2;") count=0 while count < 40: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='leader' : - if tdSql.queryResult[1][2]=='offline': - if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") - break + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='leader' : + if tdSql.queryResult[1][2]=='offline': + if tdSql.queryResult[2][2]=='follower': + print("stop mnodes on dnode 2 successfully in 10s") + break count+=1 else: print("stop mnodes on dnode 2 failed in 10s ") + return -1 + tdSql.error("drop mnode on dnode 2;") tdSql.query("show mnodes;") tdSql.checkRows(3) @@ -204,21 +278,22 @@ class TDTestCase: tdSql.checkData(2,3,'ready') def check3mnode3off(self): - tdSql.error("drop mnode on dnode 3;") count=0 while count < 10: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - if tdSql.queryResult[0][2]=='leader' : - if tdSql.queryResult[2][2]=='offline': - if tdSql.queryResult[1][2]=='follower': - print("stop mnodes on dnode 3 successfully in 10s") - break + print("mnode is three nodes") + if tdSql.queryResult[0][2]=='leader' : + if tdSql.queryResult[2][2]=='offline': + if tdSql.queryResult[1][2]=='follower': + print("stop mnodes on dnode 3 successfully in 10s") + break count+=1 else: print("stop mnodes on dnode 3 failed in 10s") - + return -1 + tdSql.error("drop mnode on dnode 3;") tdSql.query("show mnodes;") tdSql.checkRows(3) tdSql.checkData(0,1,'%s:6030'%self.host) @@ -231,8 +306,6 @@ class TDTestCase: tdSql.checkData(2,2,'offline') tdSql.checkData(2,3,'ready') - - def five_dnode_three_mnode(self,dnodenumber): tdSql.query("show dnodes;") tdSql.checkData(0,1,'%s:6030'%self.host) @@ -252,43 +325,29 @@ class TDTestCase: # fisrt check statut ready self.check3mnode() - tdSql.error("create mnode on dnode 2") - tdSql.query("show dnodes;") print(tdSql.queryResult) + tdLog.debug("stop all of mnode ") - tdLog.debug("stop and follower of mnode") - self.TDDnodes.stoptaosd(2) - self.check3mnode2off() - self.TDDnodes.starttaosd(2) - - self.TDDnodes.stoptaosd(3) - self.check3mnode3off() - self.TDDnodes.starttaosd(3) - - self.TDDnodes.stoptaosd(1) - self.check3mnode1off() - self.TDDnodes.starttaosd(1) - - # self.check3mnode() stopcount =0 while stopcount <= 2: for i in range(dnodenumber): - threads = [] - threads.append(thd.Thread(target=self.insert_data, args=(i*2,i*2+2))) - threads[0].start() + # threads=[] + # threads = MyThreadFunc(self.insert_data(i*2,i*2+2)) + threads=threading.Thread(target=self.insert_data, args=((stopcount+i)*2,(i+stopcount)*2+2)) + threads.start() self.TDDnodes.stoptaosd(i+1) - # if i == 1 : - # self.check3mnode2off() - # elif i == 2 : - # self.check3mnode3off() - # elif i == 0: - # self.check3mnode1off() - self.TDDnodes.starttaosd(i+1) - threads[0].join() - + + if self.checkdnodes(5): + print("123") + threads.join() + else: + print("456") + self._is_stopped = True + assert 1 == 2 ,"some dnode started failed" + return False # self.check3mnode() stopcount+=1 self.check3mnode() diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 13cb72670b..d175d78827 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -101,7 +101,7 @@ python3 ./test.py -f 2-query/tail.py python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode2mnode.py -# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py +python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py From 7673e33b1b96fb368994e9d2f77bb4077835429c Mon Sep 17 00:00:00 2001 From: tomchon Date: Wed, 15 Jun 2022 19:27:45 +0800 Subject: [PATCH 4/5] test:modify testcase of muti-mnode --- tests/system-test/6-cluster/5dnode3mnodeDrop.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/system-test/6-cluster/5dnode3mnodeDrop.py b/tests/system-test/6-cluster/5dnode3mnodeDrop.py index 0ab0849426..f999a16b05 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeDrop.py +++ b/tests/system-test/6-cluster/5dnode3mnodeDrop.py @@ -275,8 +275,25 @@ class TDTestCase: for i in range(1,3): tdLog.debug("drop mnode on dnode %d"%(i+1)) tdSql.execute("drop mnode on dnode %d"%(i+1)) + tdSql.query("show mnodes;") + count=0 + while count<10: + time.sleep(1) + tdSql.query("show mnodes;") + if tdSql.checkRows(2): + print("drop mnode %d successfully"%(i+1)) + break + count+=1 tdLog.debug("create mnode on dnode %d"%(i+1)) tdSql.execute("create mnode on dnode %d"%(i+1)) + count=0 + while count<10: + time.sleep(1) + tdSql.query("show mnodes;") + if tdSql.checkRows(3): + print("drop mnode %d successfully"%(i+1)) + break + count+=1 dropcount+=1 self.check3mnode() From 96021a4c6577ec9c52701ab68ba0205f4815da83 Mon Sep 17 00:00:00 2001 From: tomchon Date: Wed, 15 Jun 2022 20:57:18 +0800 Subject: [PATCH 5/5] test:modify testcase of muti-mnode --- .../6-cluster/5dnode3mnodeStopInsert.py | 56 +++++++------------ 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py b/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py index e8f850bf6f..ce3f2c4093 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py +++ b/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py @@ -23,39 +23,6 @@ class MyDnodes(TDDnodes): self.dnodes = dnodes_lists # dnode must be TDDnode instance self.simDeployed = False -class MyThreadFunc(object): - ''' - 手动终止线程的方法 - ''' - def __init__(self, func): - self.myThread = threading.Thread(target=func) - - def start(self): - print('线程启动') - self.myThread.start() - - def stop(self): - print('线程终止') - try: - for i in range(5): - self._async_raise(self.myThread.ident, SystemExit) - time.sleep(1) - except Exception as e: - print(e) - - def _async_raise(self, tid, exctype): - """raises the exception, performs cleanup if needed""" - tid = ctypes.c_long(tid) - if not inspect.isclass(exctype): - exctype = type(exctype) - res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) - if res == 0: - raise ValueError("invalid thread id") - elif res != 1: - # """if it returns a number greater than one, you're in trouble, - # and you should call it again with exc=NULL to revert the effect""" - ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) - raise SystemError("PyThreadState_SetAsyncExc failed") class TDTestCase: @@ -86,7 +53,24 @@ class TDTestCase: buildPath = root[:len(root) - len("/build/bin")] break return buildPath - + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stop_thread(self,thread): + self._async_raise(thread.ident, SystemExit) + + def insert_data(self,countstart,countstop): # fisrt add data : db\stable\childtable\general table @@ -339,13 +323,13 @@ class TDTestCase: threads.start() self.TDDnodes.stoptaosd(i+1) self.TDDnodes.starttaosd(i+1) - + if self.checkdnodes(5): print("123") threads.join() else: print("456") - self._is_stopped = True + self.stop_thread(threads) assert 1 == 2 ,"some dnode started failed" return False # self.check3mnode()