Merge pull request #26294 from taosdata/tests/TS-5067-3.0
test: [TS-5067] add test cases for the drop consumer group statement
This commit is contained in:
commit
aa7d4c1180
|
@ -516,17 +516,17 @@ class TDTestCase:
|
|||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
}
|
||||
consumer = Consumer(consumer_dict)
|
||||
consumer1 = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["t1"])
|
||||
consumer1.subscribe(["t1"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
index = 0
|
||||
try:
|
||||
while True:
|
||||
res = consumer.poll(1)
|
||||
res = consumer1.poll(1)
|
||||
if not res:
|
||||
if index != 1:
|
||||
tdLog.exit("consume error")
|
||||
|
@ -543,18 +543,85 @@ class TDTestCase:
|
|||
|
||||
index += 1
|
||||
finally:
|
||||
consumer.close()
|
||||
consumer1.close()
|
||||
|
||||
consumer1 = Consumer(consumer_dict)
|
||||
consumer2 = Consumer(consumer_dict)
|
||||
try:
|
||||
consumer1.subscribe(["t2"])
|
||||
consumer2.subscribe(["t2"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
tdSql.query(f'show subscriptions')
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, "t2")
|
||||
tdSql.checkData(0, 1, 'g1')
|
||||
tdSql.checkData(1, 0, 't1')
|
||||
tdSql.checkData(1, 1, 'g1')
|
||||
|
||||
tdSql.query(f'show consumers')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, 'g1')
|
||||
tdSql.checkData(0, 4, 't2')
|
||||
tdSql.execute(f'drop consumer group g1 on t1')
|
||||
tdSql.query(f'show consumers')
|
||||
tdSql.checkRows(1)
|
||||
consumer1.close()
|
||||
tdSql.checkData(0, 1, 'g1')
|
||||
tdSql.checkData(0, 4, 't2')
|
||||
|
||||
tdSql.query(f'show subscriptions')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, "t2")
|
||||
tdSql.checkData(0, 1, 'g1')
|
||||
|
||||
index = 0
|
||||
try:
|
||||
while True:
|
||||
res = consumer2.poll(1)
|
||||
if not res:
|
||||
if index != 1:
|
||||
tdLog.exit("consume error")
|
||||
break
|
||||
val = res.value()
|
||||
if val is None:
|
||||
continue
|
||||
cnt = 0;
|
||||
for block in val:
|
||||
cnt += len(block.fetchall())
|
||||
|
||||
if cnt != 8:
|
||||
tdLog.exit("consume error")
|
||||
|
||||
index += 1
|
||||
finally:
|
||||
consumer2.close()
|
||||
|
||||
consumer3 = Consumer(consumer_dict)
|
||||
try:
|
||||
consumer3.subscribe(["t2"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
tdSql.query(f'show consumers')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, 'g1')
|
||||
tdSql.checkData(0, 4, 't2')
|
||||
|
||||
tdSql.execute(f'insert into t4 using st tags(3) values(now, 1)')
|
||||
try:
|
||||
res = consumer3.poll(1)
|
||||
if not res:
|
||||
tdLog.exit("consume1 error")
|
||||
finally:
|
||||
consumer3.close()
|
||||
|
||||
tdSql.query(f'show consumers')
|
||||
tdSql.checkRows(0)
|
||||
|
||||
tdSql.query(f'show subscriptions')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, "t2")
|
||||
tdSql.checkData(0, 1, 'g1')
|
||||
|
||||
tdSql.execute(f'drop topic t1')
|
||||
tdSql.execute(f'drop topic t2')
|
||||
tdSql.execute(f'drop database d1')
|
||||
|
|
Loading…
Reference in New Issue