feat: s3 support stream check
This commit is contained in:
parent
9b94b7bf00
commit
8a2760e4d0
|
@ -57,13 +57,18 @@ class TDTestCase(TBase):
|
|||
etool.benchMark(json=json)
|
||||
|
||||
tdSql.execute(f"use {self.db}")
|
||||
# set insert data information
|
||||
# come from s3_basic.json
|
||||
self.childtable_count = 4
|
||||
self.insert_rows = 1000000
|
||||
self.timestamp_step = 1000
|
||||
|
||||
def createStream(self, sname):
|
||||
sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);"
|
||||
tdSql.execute(sql)
|
||||
|
||||
def doAction(self):
|
||||
tdLog.info(f"do action.")
|
||||
|
||||
self.flushDb()
|
||||
self.compactDb()
|
||||
|
||||
|
@ -80,16 +85,33 @@ class TDTestCase(TBase):
|
|||
time.sleep(5)
|
||||
self.trimDb(True)
|
||||
loop += 1
|
||||
tdLog.info(f"loop={loop} wait 5s...")
|
||||
|
||||
def checkStreamCorrect(self):
|
||||
sql = f"select count(*) from {self.db}.stm1"
|
||||
count = 0
|
||||
for i in range(30):
|
||||
tdSql.query(sql)
|
||||
count = tdSql.getData(0, 0)
|
||||
if count == 100000 or count == 100001:
|
||||
return True
|
||||
time.sleep(1)
|
||||
|
||||
tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}")
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
self.sname = "stream1"
|
||||
if eos.isArm64Cpu():
|
||||
tdLog.success(f"{__file__} arm64 ignore executed")
|
||||
else:
|
||||
# insert data
|
||||
self.insertData()
|
||||
|
||||
# creat stream
|
||||
self.createStream(self.sname)
|
||||
|
||||
# check insert data correct
|
||||
self.checkInsertCorrect()
|
||||
|
||||
|
@ -105,6 +127,12 @@ class TDTestCase(TBase):
|
|||
# check insert correct again
|
||||
self.checkInsertCorrect()
|
||||
|
||||
# check stream correct and drop stream
|
||||
self.checkStreamCorrect()
|
||||
|
||||
# drop stream
|
||||
self.dropStream(self.sname)
|
||||
|
||||
# drop database and free s3 file
|
||||
self.dropDb()
|
||||
|
||||
|
|
|
@ -72,6 +72,9 @@ class TBase:
|
|||
def dropDb(self, show = False):
|
||||
tdSql.execute(f"drop database {self.db}", show = show)
|
||||
|
||||
def dropStream(self, sname, show = False):
|
||||
tdSql.execute(f"drop stream {sname}", show = show)
|
||||
|
||||
def splitVGroups(self):
|
||||
vgids = self.getVGroup(self.db)
|
||||
selid = random.choice(vgids)
|
||||
|
|
|
@ -482,6 +482,11 @@ class TDSql:
|
|||
time.sleep(1)
|
||||
pass
|
||||
|
||||
# execute many sql
|
||||
def executes(self, sqls, queryTimes=30, show=False):
|
||||
for sql in sqls:
|
||||
self.execute(sql, queryTimes, show)
|
||||
|
||||
def checkAffectedRows(self, expectAffectedRows):
|
||||
if self.affectedRows != expectAffectedRows:
|
||||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||
|
|
Loading…
Reference in New Issue