diff --git a/tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py b/tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py index 3dabca4cd1..dd2420dcfb 100644 --- a/tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py +++ b/tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py @@ -205,12 +205,21 @@ class TDTestCase: time.sleep(self.walRetentionPeriod + 1) tdLog.info("secondely call to flash database") tdSql.query("flush database %s"%(paraDict['dbName'])) - + # wait the consumer to complete one poll while (0 == self.retryPoll): time.sleep(1) continue - + + # write data again when consumer stopped to make sure some data aren't consumed + pInsertDataAgainThread = tmqCom.asyncInsertDataByInterlace(paraDict) + pInsertDataAgainThread.join() + tdLog.info("firstly call to flash database when writing data second time") + tdSql.query("flush database %s"%(paraDict['dbName'])) + time.sleep(self.walRetentionPeriod + 1) + tdLog.info("secondely call to flash database when writing data second time") + tdSql.query("flush database %s"%(paraDict['dbName'])) + with self.lock: self.retryPoll = 0 currentTime = datetime.now() @@ -218,15 +227,14 @@ class TDTestCase: paraDict["startTs"] = 1640966400000 + paraDict["ctbNum"] * paraDict["rowsPerTbl"] pThread3 = tmqCom.asyncInsertDataByInterlace(paraDict) - - + tdLog.debug("wait sub-thread to end insert data") pThread3.join() - - totalInsertRows = paraDict["ctbNum"] * paraDict["rowsPerTbl"] * 2 + + totalInsertRows = paraDict["ctbNum"] * paraDict["rowsPerTbl"] * 3 tdLog.debug("wait sub-thread to end consume data") pThread2.join() - + tdLog.info("act consume total rows: %d, act insert total rows: %d"%(self.actConsumeTotalRows, totalInsertRows)) if (self.actConsumeTotalRows >= totalInsertRows):