add testcase of disorder and null data in tmq test

This commit is contained in:
chenhaoran 2024-03-19 15:01:39 +08:00
parent a962441252
commit f0c6515e36
1 changed files with 28 additions and 10 deletions

View File

@ -29,9 +29,11 @@ class TDTestCase:
tdSql.execute(f'use db_stmt') tdSql.execute(f'use db_stmt')
tdSql.query("select ts,k from st") tdSql.query("select ts,k from st")
tdSql.checkRows(2) tdSql.checkRows(self.expected_affected_rows)
tdSql.execute(f'create topic t_unorder_data as select ts,k from st') tdSql.execute(f'create topic t_unorder_data as select ts,k from st')
tdSql.execute(f'create topic t_unorder_data_none as select i,k from st')
consumer_dict = { consumer_dict = {
"group.id": "g1", "group.id": "g1",
"td.connect.user": "root", "td.connect.user": "root",
@ -41,7 +43,7 @@ class TDTestCase:
consumer = Consumer(consumer_dict) consumer = Consumer(consumer_dict)
try: try:
consumer.subscribe(["t_unorder_data"]) consumer.subscribe(["t_unorder_data", "t_unorder_data_none"])
except TmqError: except TmqError:
tdLog.exit(f"subscribe error") tdLog.exit(f"subscribe error")
@ -49,19 +51,23 @@ class TDTestCase:
try: try:
while True: while True:
res = consumer.poll(1) res = consumer.poll(1)
print(res) print(cnt,res)
if not res: if not res:
if cnt == 0: print("111",cnt)
if cnt == 0 or cnt != 2*self.expected_affected_rows:
tdLog.exit("consume error") tdLog.exit("consume error")
break break
val = res.value() val = res.value()
print(val)
if val is None: if val is None:
continue continue
for block in val: for block in val:
print(block.fetchall(),len(block.fetchall()))
cnt += len(block.fetchall()) cnt += len(block.fetchall())
# print(cnt)
if cnt != 2: # if cnt != 3:
tdLog.exit("consume error") # tdLog.exit("consume error")
# print("polling")
finally: finally:
consumer.close() consumer.close()
@ -110,20 +116,32 @@ class TDTestCase:
params = new_multi_binds(2) params = new_multi_binds(2)
params[0].timestamp((1626861392589, 1626861392590)) params[0].timestamp((1626861392589, 1626861392590))
params[1].int([3, None]) params[1].int([3, None])
# print(type(stmt)) # print(type(stmt))
tdLog.debug("bind_param_batch start") tdLog.debug("bind_param_batch start")
stmt.bind_param_batch(params) stmt.bind_param_batch(params)
tdLog.debug("bind_param_batch end") tdLog.debug("bind_param_batch end")
stmt.execute() stmt.execute()
tdLog.debug("execute end") tdLog.debug("execute end")
conn.execute("flush database %s" % dbname)
params1 = new_multi_binds(2)
params1[0].timestamp((1626861392587,1626861392586))
params1[1].int([None,3])
stmt.bind_param_batch(params1)
stmt.execute()
end = datetime.now() end = datetime.now()
print("elapsed time: ", end - start) print("elapsed time: ", end - start)
assert stmt.affected_rows == 2 print(stmt.affected_rows)
self.expected_affected_rows = 4
if stmt.affected_rows != self.expected_affected_rows :
tdLog.exit("affected_rows error")
tdLog.debug("close start") tdLog.debug("close start")
stmt.close() stmt.close()
# conn.execute("drop database if exists %s" % dbname) # conn.execute("drop database if exists %s" % dbname)
conn.close() conn.close()