Merge pull request #22578 from taosdata/TD-25181
update test case tmqSeekAndCommit.py by charles
This commit is contained in:
commit
ebb08bf440
|
@ -150,6 +150,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py
|
||||||
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
|
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
|
||||||
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py
|
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
|
||||||
|
|
|
@ -79,7 +79,7 @@ md5sum /home/TDinternal/debug/build/lib/libtaos.so
|
||||||
#define taospy 2.7.10
|
#define taospy 2.7.10
|
||||||
pip3 list|grep taospy
|
pip3 list|grep taospy
|
||||||
pip3 uninstall taospy -y
|
pip3 uninstall taospy -y
|
||||||
pip3 install --default-timeout=120 taospy==2.7.10
|
pip3 install --default-timeout=120 taospy==2.7.12
|
||||||
|
|
||||||
$TIMEOUT_CMD $cmd
|
$TIMEOUT_CMD $cmd
|
||||||
RET=$?
|
RET=$?
|
||||||
|
|
|
@ -21,6 +21,7 @@ class TDTestCase:
|
||||||
self.db_name = "tmq_db"
|
self.db_name = "tmq_db"
|
||||||
self.topic_name = "tmq_topic"
|
self.topic_name = "tmq_topic"
|
||||||
self.stable_name = "tmqst"
|
self.stable_name = "tmqst"
|
||||||
|
self.prepareData()
|
||||||
|
|
||||||
|
|
||||||
def prepareData(self):
|
def prepareData(self):
|
||||||
|
@ -73,8 +74,9 @@ class TDTestCase:
|
||||||
return consumer
|
return consumer
|
||||||
|
|
||||||
def test_seek_and_committed_position_with_autocommit(self):
|
def test_seek_and_committed_position_with_autocommit(self):
|
||||||
|
"""Check the position and committed offset of the topic for autocommit scenario
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
self.prepareData()
|
|
||||||
inputDict = {
|
inputDict = {
|
||||||
"topic_name": self.topic_name,
|
"topic_name": self.topic_name,
|
||||||
"group_id": "1",
|
"group_id": "1",
|
||||||
|
@ -95,18 +97,26 @@ class TDTestCase:
|
||||||
|
|
||||||
partitions = consumer.assignment()
|
partitions = consumer.assignment()
|
||||||
position_partitions = consumer.position(partitions)
|
position_partitions = consumer.position(partitions)
|
||||||
tdLog.info("position_partitions: %s"%(position_partitions))
|
|
||||||
for i in range(len(position_partitions)):
|
for i in range(len(position_partitions)):
|
||||||
tdLog.info("position_partitions[%s].offset: %s"%(i, position_partitions[i].offset))
|
tdLog.info("position_partitions[%s].offset: %s"%(i, position_partitions[i].offset))
|
||||||
committed_partitions = consumer.committed(partitions)
|
committed_partitions = consumer.committed(partitions)
|
||||||
tdLog.info("committed_partitions: %s"%(committed_partitions))
|
origin_committed_position = []
|
||||||
for i in range(len(committed_partitions)):
|
for i in range(len(committed_partitions)):
|
||||||
tdLog.info("committed_partitions[%s].offset: %s"%(i, committed_partitions[i].offset))
|
tdLog.info("committed_partitions[%s].offset: %s"%(i, committed_partitions[i].offset))
|
||||||
|
origin_committed_position.append(committed_partitions[i].offset)
|
||||||
assert(len(position_partitions) == len(committed_partitions))
|
assert(len(position_partitions) == len(committed_partitions))
|
||||||
for i in range(len(position_partitions)):
|
for i in range(len(position_partitions)):
|
||||||
assert(position_partitions[i].offset == committed_partitions[i].offset)
|
assert(position_partitions[i].offset == committed_partitions[i].offset)
|
||||||
# seek to the beginning of the topic
|
# seek to the specified offset of the topic, then check position and committed offset
|
||||||
|
for partition in partitions:
|
||||||
|
partition.offset = 5
|
||||||
|
consumer.seek(partition)
|
||||||
|
position_partitions = consumer.position(partitions)
|
||||||
|
for i in range(len(position_partitions)):
|
||||||
|
assert(position_partitions[i].offset == 5)
|
||||||
|
committed_partitions = consumer.committed(partitions)
|
||||||
|
for i in range(len(committed_partitions)):
|
||||||
|
assert(committed_partitions[i].offset != 5 and committed_partitions[i].offset == origin_committed_position[i])
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
raise Exception("Failed to test seek and committed position with autocommit with error: {}".format(str(ex)))
|
raise Exception("Failed to test seek and committed position with autocommit with error: {}".format(str(ex)))
|
||||||
finally:
|
finally:
|
||||||
|
@ -114,12 +124,81 @@ class TDTestCase:
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
|
||||||
def test_commit_by_offset(self):
|
def test_commit_by_offset(self):
|
||||||
pass
|
"""Check the position and committed offset of the topic for commit by offset scenario
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
inputDict = {
|
||||||
|
"topic_name": self.topic_name,
|
||||||
|
"group_id": "1",
|
||||||
|
"auto_commit": "false",
|
||||||
|
"offset_reset": "earliest"
|
||||||
|
}
|
||||||
|
consumer = self.tmqSubscribe(inputDict)
|
||||||
|
origin_committed_position = []
|
||||||
|
while(True):
|
||||||
|
res = consumer.poll(1)
|
||||||
|
if not res:
|
||||||
|
break
|
||||||
|
err = res.error()
|
||||||
|
if err is not None:
|
||||||
|
raise err
|
||||||
|
partitions = consumer.assignment()
|
||||||
|
consumer.commit(offsets=partitions)
|
||||||
|
val = res.value()
|
||||||
|
for block in val:
|
||||||
|
tdLog.info("block.fetchall() number: %s"%(len(block.fetchall())))
|
||||||
|
position_partitions = consumer.position(partitions)
|
||||||
|
committed_partitions = consumer.committed(partitions)
|
||||||
|
for i in range(len(position_partitions)):
|
||||||
|
assert(position_partitions[i].offset == committed_partitions[i].offset)
|
||||||
|
committed_partitions = consumer.committed(partitions)
|
||||||
|
for i in range(len(committed_partitions)):
|
||||||
|
origin_committed_position.append(committed_partitions[i].offset)
|
||||||
|
tdLog.info("original committed_partitions[%s].offset: %s"%(i, committed_partitions[i].offset))
|
||||||
|
# seek to the specified offset of the topic, then check position and committed offset
|
||||||
|
for partition in partitions:
|
||||||
|
partition.offset = 2
|
||||||
|
consumer.seek(partition)
|
||||||
|
position_partitions = consumer.position(partitions)
|
||||||
|
for i in range(len(position_partitions)):
|
||||||
|
assert(position_partitions[i].offset == 2)
|
||||||
|
committed_partitions = consumer.committed(partitions)
|
||||||
|
for i in range(len(committed_partitions)):
|
||||||
|
tdLog.info("after seek committed_partitions[%s].offset: %s"%(i, committed_partitions[i].offset))
|
||||||
|
assert(committed_partitions[i].offset != 2 and committed_partitions[i].offset == origin_committed_position[i])
|
||||||
|
# continue to consume data from seek offset
|
||||||
|
while(True):
|
||||||
|
res = consumer.poll(1)
|
||||||
|
if not res:
|
||||||
|
break
|
||||||
|
err = res.error()
|
||||||
|
if err is not None:
|
||||||
|
raise err
|
||||||
|
partitions = consumer.assignment()
|
||||||
|
# commit by offset
|
||||||
|
consumer.commit(offsets=partitions)
|
||||||
|
val = res.value()
|
||||||
|
for block in val:
|
||||||
|
tdLog.info("block.fetchall() number: %s"%(len(block.fetchall())))
|
||||||
|
partitions = consumer.assignment()
|
||||||
|
position_partitions = consumer.position(partitions)
|
||||||
|
committed_partitions = consumer.committed(partitions)
|
||||||
|
assert(len(position_partitions) == len(committed_partitions))
|
||||||
|
for i in range(len(position_partitions)):
|
||||||
|
assert(position_partitions[i].offset == committed_partitions[i].offset)
|
||||||
|
except Exception as ex:
|
||||||
|
raise Exception("Failed to test commit by offset with error: {}".format(str(ex)))
|
||||||
|
finally:
|
||||||
|
consumer.unsubscribe()
|
||||||
|
consumer.close()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.test_seek_and_committed_position_with_autocommit()
|
self.test_seek_and_committed_position_with_autocommit()
|
||||||
|
self.test_commit_by_offset()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
tdSql.execute("drop topic %s" % self.topic_name)
|
||||||
|
tdSql.execute("drop database %s"%(self.db_name))
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success(f"{__file__} successfully executed")
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue