diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 00171a19a6..7f6b129bb9 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -307,8 +307,8 @@ class TDSql: return col_name_list, col_type_list return col_name_list - def waitedQuery(self, sql, expectRows, timeout): - tdLog.info("sql: %s, try to retrieve %d rows in %d seconds" % (sql, expectRows, timeout)) + def waitedQuery(self, sql, expectedRows, timeout): + tdLog.info("sql: %s, try to retrieve %d rows in %d seconds" % (sql, expectedRows, timeout)) self.sql = sql try: for i in range(timeout): @@ -316,8 +316,8 @@ class TDSql: self.queryResult = self.cursor.fetchall() self.queryRows = len(self.queryResult) self.queryCols = len(self.cursor.description) - tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectRows, self.queryRows)) - if self.queryRows >= expectRows: + tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectedRows, self.queryRows)) + if self.queryRows >= expectedRows: return (self.queryRows, i) time.sleep(1) except Exception as e: @@ -330,15 +330,26 @@ class TDSql: def getRows(self): return self.queryRows - def checkRows(self, expectRows): - if self.queryRows == expectRows: - tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows)) + def checkRows(self, expectedRows): + if self.queryRows == expectedRows: + tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectedRows)) return True else: caller = inspect.getframeinfo(inspect.stack()[1][0]) - args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectRows) + args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectedRows) tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args) + def checkRows_not_exited(self, expectedRows): + """ + Check if the query rows is equal to the expected rows + :param expectedRows: The expected number of rows. + :return: Returns True if the actual number of rows matches the expected number, otherwise returns False. + """ + if self.queryRows == expectedRows: + return True + else: + return False + def checkRows_range(self, excepte_row_list): if self.queryRows in excepte_row_list: tdLog.info(f"sql:{self.sql}, queryRows:{self.queryRows} in expect:{excepte_row_list}") @@ -508,7 +519,7 @@ class TDSql: # return true or false replace exit, no print out - def checkDataNoExit(self, row, col, data): + def checkDataNotExit(self, row, col, data): if self.checkRowColNoExit(row, col) == False: return False if self.queryResult[row][col] != data: @@ -542,7 +553,7 @@ class TDSql: # loop check util checkData return true for i in range(loopCount): self.query(sql) - if self.checkDataNoExit(row, col, data) : + if self.checkDataNotExit(row, col, data) : self.checkData(row, col, data) return time.sleep(waitTime) @@ -551,6 +562,19 @@ class TDSql: self.query(sql) self.checkData(row, col, data) + def check_rows_loop(self, expectedRows, sql, loopCount, waitTime): + # loop check util checkData return true + for i in range(loopCount): + self.query(sql) + if self.checkRows_not_exited(expectedRows): + return + else: + time.sleep(waitTime) + continue + # last check + self.query(sql) + self.checkRows(expectedRows) + def getData(self, row, col): self.checkRowCol(row, col) diff --git a/tests/system-test/8-stream/stream_multi_agg.py b/tests/system-test/8-stream/stream_multi_agg.py index 32c2648b46..1386814f0c 100644 --- a/tests/system-test/8-stream/stream_multi_agg.py +++ b/tests/system-test/8-stream/stream_multi_agg.py @@ -37,26 +37,26 @@ class TDTestCase: def case1(self): tdLog.debug("========case1 start========") - os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 5 > /dev/null 2>&1 &") + os.system(" taosBenchmark -y -B 1 -t 10 -S 1000 -n 10 -i 1000 -v 5 ") time.sleep(10) tdSql.execute("use test", queryTimes=100) tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") - tdLog.debug("========create stream and insert data ok========") - time.sleep(20) + tdLog.debug("========create stream and insert data ok========") tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") rowCnt = tdSql.getRows() - results = [] - for i in range(rowCnt): - results.append(tdSql.getData(i,1)) + results_meters = tdSql.queryResult - tdSql.query("select * from st1 order by groupid,_wstart") - tdSql.checkRows(rowCnt) + sql = "select _wstart,`sum(voltage)`,groupid from st1 order by groupid,_wstart" + tdSql.check_rows_loop(rowCnt, sql, loopCount=100, waitTime=0.5) + + tdSql.query(sql) + results_st1 = tdSql.queryResult for i in range(rowCnt): - data1 = tdSql.getData(i,1) - data2 = results[i] + data1 = results_st1[i] + data2 = results_meters[i] if data1 != data2: - tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2)) + tdLog.info(f"num: {i}, act data: {data1}, expect data: {data2}") tdLog.exit("check data error!") tdLog.debug("case1 end") @@ -64,7 +64,7 @@ class TDTestCase: def case2(self): tdLog.debug("========case2 start========") - os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y > /dev/null 2>&1") + os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y") # create stream tdSql.execute("use db", queryTimes=100) tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) @@ -73,7 +73,7 @@ class TDTestCase: sql = "select count(*) from sta" # loop wait max 60s to check count is ok tdLog.info("loop wait result ...") - tdSql.checkDataLoop(0, 0, 100, sql, loopCount=10, waitTime=0.5) + tdSql.checkDataLoop(0, 0, 100, sql, loopCount=100, waitTime=0.5) # check all data is correct sql = "select * from sta where cnt != 200;"