From 8a2760e4d00c81adbaf7e42a187d2bc9a6fe713a Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sun, 14 Jan 2024 15:39:10 +0800 Subject: [PATCH] feat: s3 support stream check --- tests/army/enterprise/s3/s3_basic.py | 30 +++++++++++++++++++++++++++- tests/army/frame/caseBase.py | 3 +++ tests/army/frame/sql.py | 5 +++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/tests/army/enterprise/s3/s3_basic.py b/tests/army/enterprise/s3/s3_basic.py index f22d482502..39ab64e85d 100644 --- a/tests/army/enterprise/s3/s3_basic.py +++ b/tests/army/enterprise/s3/s3_basic.py @@ -57,13 +57,18 @@ class TDTestCase(TBase): etool.benchMark(json=json) tdSql.execute(f"use {self.db}") - # set insert data information + # come from s3_basic.json self.childtable_count = 4 self.insert_rows = 1000000 self.timestamp_step = 1000 + def createStream(self, sname): + sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);" + tdSql.execute(sql) + def doAction(self): tdLog.info(f"do action.") + self.flushDb() self.compactDb() @@ -80,16 +85,33 @@ class TDTestCase(TBase): time.sleep(5) self.trimDb(True) loop += 1 + tdLog.info(f"loop={loop} wait 5s...") + + def checkStreamCorrect(self): + sql = f"select count(*) from {self.db}.stm1" + count = 0 + for i in range(30): + tdSql.query(sql) + count = tdSql.getData(0, 0) + if count == 100000 or count == 100001: + return True + time.sleep(1) + + tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}") # run def run(self): tdLog.debug(f"start to excute {__file__}") + self.sname = "stream1" if eos.isArm64Cpu(): tdLog.success(f"{__file__} arm64 ignore executed") else: # insert data self.insertData() + # creat stream + self.createStream(self.sname) + # check insert data correct self.checkInsertCorrect() @@ -105,6 +127,12 @@ class TDTestCase(TBase): # check insert correct again self.checkInsertCorrect() + # check stream correct and drop stream + self.checkStreamCorrect() + + # drop stream + self.dropStream(self.sname) + # drop database and free s3 file self.dropDb() diff --git a/tests/army/frame/caseBase.py b/tests/army/frame/caseBase.py index c9f3aa1880..ec6b36aa1b 100644 --- a/tests/army/frame/caseBase.py +++ b/tests/army/frame/caseBase.py @@ -72,6 +72,9 @@ class TBase: def dropDb(self, show = False): tdSql.execute(f"drop database {self.db}", show = show) + def dropStream(self, sname, show = False): + tdSql.execute(f"drop stream {sname}", show = show) + def splitVGroups(self): vgids = self.getVGroup(self.db) selid = random.choice(vgids) diff --git a/tests/army/frame/sql.py b/tests/army/frame/sql.py index 2e14f0c2f0..eafae9be2d 100644 --- a/tests/army/frame/sql.py +++ b/tests/army/frame/sql.py @@ -482,6 +482,11 @@ class TDSql: time.sleep(1) pass + # execute many sql + def executes(self, sqls, queryTimes=30, show=False): + for sql in sqls: + self.execute(sql, queryTimes, show) + def checkAffectedRows(self, expectAffectedRows): if self.affectedRows != expectAffectedRows: caller = inspect.getframeinfo(inspect.stack()[1][0])