diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 814c7c060c..7d730e31e1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -402,8 +402,6 @@ typedef struct SStreamScanInfo { uint64_t numOfExec; // execution times STqReader* tqReader; - int32_t tsArrayIndex; - SArray* tsArray; uint64_t groupId; SUpdateInfo* pUpdateInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f3917dfeb1..d493adbea3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1247,30 +1247,38 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { return pBlock; } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { qDebug("scan mode %d", pInfo->scanMode); - if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { - blockDataDestroy(pInfo->pUpdateRes); - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - return pInfo->pRes; - } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { - generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); - return pInfo->pUpdateRes; - } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE || pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) { - SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); - if (pSDB) { - pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; - checkUpdateData(pInfo, true, pSDB, false); - return pSDB; - } - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + switch (pInfo->scanMode) { + case STREAM_SCAN_FROM_RES: { + blockDataDestroy(pInfo->pUpdateRes); + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + return pInfo->pRes; + } break; + case STREAM_SCAN_FROM_UPDATERES: { + generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + return pInfo->pUpdateRes; + } break; + case STREAM_SCAN_FROM_DATAREADER_RANGE: + case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: { + SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); + if (pSDB) { + pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; + checkUpdateData(pInfo, true, pSDB, false); + return pSDB; + } + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } break; + default: + break; } - if (isStateWindow(pInfo) && pInfo->sessionSup.pStreamAggSup->pScanBlock->info.rows > 0) { + SStreamAggSupporter* pSup = pInfo->sessionSup.pStreamAggSup; + if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->updateResIndex = 0; - copyDataBlock(pInfo->pUpdateRes, pInfo->sessionSup.pStreamAggSup->pScanBlock); - blockDataCleanup(pInfo->sessionSup.pStreamAggSup->pScanBlock); + copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock); + blockDataCleanup(pSup->pScanBlock); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); return pInfo->pUpdateRes; } @@ -1329,7 +1337,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); /*pOperator->status = OP_EXEC_DONE;*/ } else if (pInfo->pUpdateInfo) { - pInfo->tsArrayIndex = 0; checkUpdateData(pInfo, true, pInfo->pRes, true); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey); if (pInfo->pUpdateDataRes->info.rows > 0) { @@ -1387,7 +1394,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { #if 1 if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info; - destroyTableScanOperatorInfo(pTableScanInfo, 1); + destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput); } #endif if (pStreamScan->tqReader) { @@ -1401,8 +1408,8 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pStreamScan->pUpdateRes); blockDataDestroy(pStreamScan->pPullDataRes); blockDataDestroy(pStreamScan->pDeleteDataRes); + blockDataDestroy(pStreamScan->pUpdateDataRes); taosArrayDestroy(pStreamScan->pBlockLists); - taosArrayDestroy(pStreamScan->tsArray); taosMemoryFree(pStreamScan); } @@ -1444,11 +1451,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys goto _error; } - pInfo->tsArray = taosArrayInit(4, sizeof(int32_t)); - if (pInfo->tsArray == NULL) { - goto _error; - } - if (pHandle->vnode) { SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 19abb88df7..025806e117 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1648,6 +1648,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { } } nodesDestroyNode((SNode*)pInfo->pPhyNode); + colDataDestroy(&pInfo->twAggSup.timeWindowData); taosMemoryFreeClear(param); } @@ -2934,9 +2935,10 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) { pBlock->info.groupId = 0; pBlock->info.rows = 0; pBlock->info.type = type; - pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t); + pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY); - pBlock->pDataBlock = taosArrayInit(3, sizeof(SColumnInfoData)); + pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; infoData.info.bytes = sizeof(TSKEY); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index b7b635e28f..f2a5ba0ab5 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -207,6 +207,7 @@ void updateInfoDestroy(SUpdateInfo *pInfo) { } taosArrayDestroy(pInfo->pTsSBFs); + taosHashCleanup(pInfo->pMap); taosMemoryFree(pInfo); } diff --git a/tests/system-test/6-cluster/5dnode2mnode.py b/tests/system-test/6-cluster/5dnode2mnode.py index 7ce716c483..e4df9df4ea 100644 --- a/tests/system-test/6-cluster/5dnode2mnode.py +++ b/tests/system-test/6-cluster/5dnode2mnode.py @@ -127,52 +127,12 @@ class TDTestCase: tdLog.notice(" The election still succeeds when leader of both mnodes are killed ") except Exception: pass - tdSql.error("create user user1 pass '123';") + # tdSql.error("create user user1 pass '123';") tdLog.info("start leader") tdDnodes[0].starttaosd() if clusterComCheck.checkMnodeStatus(2) : print("both mnodes are ready") - # # fisrt drop follower of mnode - # BUG - # tdSql.execute("drop mnode on dnode 2") - # tdSql.query("show mnodes;") - # tdSql.checkRows(1) - # tdSql.checkData(0,1,'%s:6030'%self.host) - # tdSql.checkData(0,2,'leader') - # tdSql.checkData(0,3,'ready') - - # tdSql.execute("create mnode on dnode 2") - # count=0 - # while count < 10: - # time.sleep(1) - # tdSql.query("show mnodes;") - # tdSql.checkRows(2) - # if tdSql.queryResult[0][2]=='leader' : - # if tdSql.queryResult[1][2]=='follower': - # print("two mnodes is ready") - # break - # count+=1 - # else: - # print("two mnodes is not ready in 10s ") - - # tdSql.query("show mnodes;") - # tdSql.checkRows(2) - # tdSql.checkData(0,1,'%s:6030'%self.host) - # tdSql.checkData(0,2,'leader') - # tdSql.checkData(0,3,'ready') - # tdSql.checkData(1,1,'%s:6130'%self.host) - # tdSql.checkData(1,2,'follower') - # tdSql.checkData(2,3,'ready') - - - def getConnection(self, dnode): - host = dnode.cfgDict["fqdn"] - port = dnode.cfgDict["serverPort"] - config_dir = dnode.cfgDir - return taos.connect(host=host, port=int(port), config=config_dir) - - def run(self): self.five_dnode_two_mnode() diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py index bc45b10f9b..3c0e479030 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py @@ -186,7 +186,7 @@ class TDTestCase: tdLog.info("check dnode number:") clusterComCheck.checkDnodes(dnodeNumbers) tdSql.query("show databases") - tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers-2)) + tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers)) # tdLog.info("check DB Rows:") # clusterComCheck.checkDbRows(allDbNumbers) diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py index 36db0ffc31..f685ef2f1a 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py @@ -65,34 +65,9 @@ class TDTestCase: self._async_raise(thread.ident, SystemExit) - def insertData(self,countstart,countstop): - # fisrt add data : db\stable\childtable\general table - - for couti in range(countstart,countstop): - tdLog.debug("drop database if exists db%d" %couti) - tdSql.execute("drop database if exists db%d" %couti) - print("create database if not exists db%d replica 1 duration 300" %couti) - tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) - tdSql.execute("use db%d" %couti) - tdSql.execute( - '''create table stb1 - (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) - tags (t1 int) - ''' - ) - tdSql.execute( - ''' - create table t1 - (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) - ''' - ) - for i in range(4): - tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') - - def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): tdLog.printNoPrefix("======== test case 1: ") - paraDict = {'dbName': 'db', + paraDict = {'dbName': 'db0_0', 'dropFlag': 1, 'event': '', 'vgroups': 4, diff --git a/tests/system-test/6-cluster/5dnode3mnodeStop2Follower.py b/tests/system-test/6-cluster/5dnode3mnodeStop2Follower.py new file mode 100644 index 0000000000..954e1ae003 --- /dev/null +++ b/tests/system-test/6-cluster/5dnode3mnodeStop2Follower.py @@ -0,0 +1,119 @@ +from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +from test import tdDnodes +sys.path.append("./6-cluster") + +from clusterCommonCreate import * +from clusterCommonCheck import * +import time +import socket +import subprocess +from multiprocessing import Process + + +class TDTestCase: + + def init(self,conn ,logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1} + dnodenumbers=int(dnodenumbers) + mnodeNums=int(mnodeNums) + dbNumbers = int(dnodenumbers * restartNumber) + + tdLog.info("first check dnode and mnode") + tdSql.query("show dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodenumbers) + clusterComCheck.checkMnodeStatus(1) + + # fisr add three mnodes; + tdLog.info("fisr add three mnodes and check mnode status") + tdSql.execute("create mnode on dnode 2") + clusterComCheck.checkMnodeStatus(2) + tdSql.execute("create mnode on dnode 3") + clusterComCheck.checkMnodeStatus(3) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("show dnodes;") + # print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodenumbers) + # restart all taosd + tdDnodes=cluster.dnodes + + tdDnodes[1].stoptaosd() + tdDnodes[2].stoptaosd() + + tdLog.info("check whether 2 mnode status is offline") + clusterComCheck.check3mnode2off() + # tdSql.error("create user user1 pass '123';") + + tdLog.info("start two follower") + tdDnodes[1].starttaosd() + tdDnodes[2].starttaosd() + + clusterComCheck.checkMnodeStatus(3) + + + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodenumbers=5,mnodeNums=3,restartNumber=1) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index 12ae34ac2b..294f7cf61c 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -205,7 +205,24 @@ class ClusterComCheck: tdLog.debug(tdSql.queryResult) tdLog.exit("stop mnodes on dnode %d failed in 10s ") - + def check3mnode2off(self,mnodeNums=3): + count=0 + while count < 10: + time.sleep(1) + tdSql.query("show mnodes;") + if tdSql.checkRows(mnodeNums) : + tdLog.success("cluster has %d mnodes" %self.mnodeNums ) + else: + tdLog.exit("mnode number is correct") + if tdSql.queryResult[0][2]=='leader' : + if tdSql.queryResult[1][2]=='offline' and tdSql.queryResult[1][3]== 'ready' : + if tdSql.queryResult[2][2]=='offline' and tdSql.queryResult[2][3]== 'ready' : + tdLog.success("stop mnodes of follower on dnode successfully in 10s") + return True + count+=1 + else: + tdLog.debug(tdSql.queryResult) + tdLog.exit("stop mnodes on dnode %d failed in 10s ") diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 81c2becbde..c8ddc8d3c2 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -129,9 +129,12 @@ class TMQCom: def stopTmqSimProcess(self, processorName): psCmd = "ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"%(processorName) processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") + onlyKillOnceWindows = 0 while(processID): - killCmd = "kill -INT %s > /dev/null 2>&1" % processID - os.system(killCmd) + if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'): + killCmd = "kill -INT %s > /dev/null 2>&1" % processID + os.system(killCmd) + onlyKillOnceWindows = 1 time.sleep(0.2) processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") tdLog.debug("%s is stopped by kill -INT" % (processorName)) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index cade42db98..410a7e210f 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -154,21 +154,24 @@ python3 ./test.py -f 2-query/last_row.py python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3 -# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 # python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 -# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 -# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 5 -M 3 +# python3 ./test.py -f 6-cluster/5dnode3mnodeRestartMnodeInsertData.py -N 5 -M 3 +# python3 ./test.py -f 6-cluster/5dnode3mnodeRestartVnodeInsertData.py -N 5 -M 3 + +python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py # python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 # python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3 -# BUG Redict python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5 -# python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 5 -M 3 - python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5 python3 ./test.py -f 7-tmq/basic5.py