Merge pull request #22738 from taosdata/TD-25824

update test case tmqConsumerDiscontinuousData.py to fix the failed oc…
This commit is contained in:
Alex Duan 2023-09-08 09:07:41 +08:00 committed by GitHub
commit 3e71208d0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 15 additions and 7 deletions

View File

@ -205,12 +205,21 @@ class TDTestCase:
time.sleep(self.walRetentionPeriod + 1)
tdLog.info("secondely call to flash database")
tdSql.query("flush database %s"%(paraDict['dbName']))
# wait the consumer to complete one poll
while (0 == self.retryPoll):
time.sleep(1)
continue
# write data again when consumer stopped to make sure some data aren't consumed
pInsertDataAgainThread = tmqCom.asyncInsertDataByInterlace(paraDict)
pInsertDataAgainThread.join()
tdLog.info("firstly call to flash database when writing data second time")
tdSql.query("flush database %s"%(paraDict['dbName']))
time.sleep(self.walRetentionPeriod + 1)
tdLog.info("secondely call to flash database when writing data second time")
tdSql.query("flush database %s"%(paraDict['dbName']))
with self.lock:
self.retryPoll = 0
currentTime = datetime.now()
@ -218,15 +227,14 @@ class TDTestCase:
paraDict["startTs"] = 1640966400000 + paraDict["ctbNum"] * paraDict["rowsPerTbl"]
pThread3 = tmqCom.asyncInsertDataByInterlace(paraDict)
tdLog.debug("wait sub-thread to end insert data")
pThread3.join()
totalInsertRows = paraDict["ctbNum"] * paraDict["rowsPerTbl"] * 2
totalInsertRows = paraDict["ctbNum"] * paraDict["rowsPerTbl"] * 3
tdLog.debug("wait sub-thread to end consume data")
pThread2.join()
tdLog.info("act consume total rows: %d, act insert total rows: %d"%(self.actConsumeTotalRows, totalInsertRows))
if (self.actConsumeTotalRows >= totalInsertRows):