diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 6241c089e9..c9c8a66c7a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1768,7 +1768,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, tstrerror(pRspWrapper->code)); taosWLockLatch(&tmq->lock); SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); - pVg->emptyBlockReceiveTs = taosGetTimestampMs(); + if(pVg) pVg->emptyBlockReceiveTs = taosGetTimestampMs(); taosWUnLockLatch(&tmq->lock); } setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 9c6cd5c99a..18fb3cc4dd 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -164,6 +164,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -i True ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3 -i True ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform.py -N 2 -n 1 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit.py -N 2 -n 1 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeReplicate.py -M 3 -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-19201.py ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py diff --git a/tests/system-test/7-tmq/tmqVnodeSplit.py b/tests/system-test/7-tmq/tmqVnodeSplit.py index ee4dc32e40..c6cdc2bf83 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit.py @@ -28,9 +28,9 @@ class TDTestCase: tdSql.init(conn.cursor(), False) def getDataPath(self): - selfPath = os.path.dirname(os.path.realpath(__file__)) + selfPath = tdCom.getBuildPath() - return selfPath + '/../../../sim/dnode%d/data/vnode/vnode%d/wal/*'; + return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*'; def prepareTestEnv(self): tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") @@ -58,25 +58,14 @@ class TDTestCase: paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl + tdCom.drop_all_db(); tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) tdLog.info("create stb") tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) - # tdLog.info("create ctb") - # tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], - # ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) - # tdLog.info("insert data") - # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") - # tdDnodes.stop(1) - # tdDnodes.start(1) - # tdSql.query("flush database %s"%(paraDict['dbName'])) return - def restartAndRemoveWal(self): + def restartAndRemoveWal(self, deleteWal): tdDnodes = cluster.dnodes tdSql.query("select * from information_schema.ins_vnodes") for result in tdSql.queryResult: @@ -89,14 +78,17 @@ class TDTestCase: time.sleep(1) dataPath = self.getDataPath() dataPath = dataPath%(dnodeId,vnodeId) - os.system('rm -rf ' + dataPath) tdLog.debug("dataPath:%s"%dataPath) + if deleteWal: + if os.system('rm -rf ' + dataPath) != 0: + tdLog.exit("rm error") + tdDnodes[dnodeId - 1].starttaosd() time.sleep(1) break tdLog.debug("restart dnode ok") - def redistributeVgroups(self): + def splitVgroups(self): tdSql.query("select * from information_schema.ins_vnodes") vnodeId = 0 for result in tdSql.queryResult: @@ -109,7 +101,7 @@ class TDTestCase: tdSql.query(splitSql) tdLog.debug("splitSql ok") - def tmqCase1(self): + def tmqCase1(self, deleteWal=False): tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, @@ -172,10 +164,10 @@ class TDTestCase: tmqCom.getStartCommitNotifyFromTmqsim() #restart dnode & remove wal - self.restartAndRemoveWal() + self.restartAndRemoveWal(deleteWal) - # redistribute vgroup - self.redistributeVgroups(); + # split vgroup + self.splitVgroups() tdLog.info("create ctb2") paraDict['ctbPrefix'] = "ctbn" @@ -195,129 +187,18 @@ class TDTestCase: # tmqCom.checkFileContent(consumerId, queryString) - time.sleep(10) + time.sleep(2) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) tdLog.printNoPrefix("======== test case 1 end ...... ") - def tmqCase2(self): - tdLog.printNoPrefix("======== test case 2: ") - paraDict = {'dbName':'dbt'} - - ntbName = "ntb" - - topicNameList = ['topic2'] - tmqCom.initConsumerTable() - - sqlString = "create table %s.%s(ts timestamp, i nchar(8))" %(paraDict['dbName'], ntbName) - tdLog.info("create nomal table sql: %s"%sqlString) - tdSql.execute(sqlString) - - tdLog.info("create topics from nomal table") - queryString = "select * from %s.%s"%(paraDict['dbName'], ntbName) - sqlString = "create topic %s as %s" %(topicNameList[0], queryString) - tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - tdSql.query("flush database %s"%(paraDict['dbName'])) - #restart dnode & remove wal - self.restartAndRemoveWal() - - # redistribute vgroup - self.redistributeVgroups(); - - sqlString = "alter table %s.%s modify column i nchar(16)" %(paraDict['dbName'], ntbName) - tdLog.info("alter table sql: %s"%sqlString) - tdSql.error(sqlString) - - time.sleep(1) - for i in range(len(topicNameList)): - tdSql.query("drop topic %s"%topicNameList[i]) - - tdLog.printNoPrefix("======== test case 2 end ...... ") - - def tmqCase3(self): - tdLog.printNoPrefix("======== test case 3: ") - paraDict = {'dbName': 'dbt', - 'dropFlag': 1, - 'event': '', - 'vgroups': 1, - 'stbName': 'stbn', - 'colPrefix': 'c', - 'tagPrefix': 't', - 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], - 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], - 'ctbPrefix': 'ctb', - 'ctbStartIdx': 0, - 'ctbNum': 10, - 'rowsPerTbl': 10000, - 'batchNum': 10, - 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 2, - 'showMsg': 1, - 'showRow': 1, - 'snapshot': 0} - - paraDict['vgroups'] = self.vgroups - paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - - topicNameList = ['topic3'] - tmqCom.initConsumerTable() - - tdLog.info("create stb") - tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) - - tdLog.info("create ctb") - tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], - ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("insert data") - tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - tdLog.info("create topics from stb with filter") - queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) - sqlString = "create topic %s as %s" %(topicNameList[0], queryString) - tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - - # init consume info, and start tmq_sim, then check consume result - tdLog.info("insert consume info to consume processor") - consumerId = 0 - expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] - topicList = topicNameList[0] - ifcheckdata = 1 - ifManualCommit = 1 - keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest' - tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) - - tdLog.info("start consume processor") - tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - - time.sleep(5) - #restart dnode & remove wal - self.restartAndRemoveWal() - - # redistribute vgroup - self.redistributeVgroups(); - - tdLog.info("start consume processor") - tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("wait the consume result") - - time.sleep(10) - for i in range(len(topicNameList)): - tdSql.query("drop topic %s"%topicNameList[i]) - - tdLog.printNoPrefix("======== test case 3 end ...... ") - def run(self): - tdSql.prepare() self.prepareTestEnv() - self.tmqCase1() + self.tmqCase1(True) + self.prepareTestEnv() + self.tmqCase1(False) def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmqVnodeTransform.py b/tests/system-test/7-tmq/tmqVnodeTransform.py index 8b2524a6bc..fa50e46853 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform.py @@ -28,9 +28,9 @@ class TDTestCase: tdSql.init(conn.cursor(), False) def getDataPath(self): - selfPath = os.path.dirname(os.path.realpath(__file__)) + selfPath = tdCom.getBuildPath() - return selfPath + '/../../../sim/dnode%d/data/vnode/vnode%d/wal/*'; + return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*'; def prepareTestEnv(self): tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") @@ -58,6 +58,7 @@ class TDTestCase: paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl + tdCom.drop_all_db(); tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) tdLog.info("create stb") @@ -325,6 +326,7 @@ class TDTestCase: self.prepareTestEnv() self.tmqCase1() self.tmqCase2() + self.prepareTestEnv() self.tmqCase3() def stop(self):