From 5ad58438b658a7b56bf568afb220d3d793555822 Mon Sep 17 00:00:00 2001 From: "chao.feng" Date: Fri, 25 Aug 2023 15:17:09 +0800 Subject: [PATCH] update test case tmqSeekAndCommit.py by charles --- tests/parallel_test/cases.task | 1 + tests/parallel_test/run_case.sh | 2 +- tests/system-test/7-tmq/tmqSeekAndCommit.py | 93 +++++++++++++++++++-- 3 files changed, 88 insertions(+), 8 deletions(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 326a754654..5dcba314fa 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -135,6 +135,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py diff --git a/tests/parallel_test/run_case.sh b/tests/parallel_test/run_case.sh index 206f99ff3d..ea22d11e08 100755 --- a/tests/parallel_test/run_case.sh +++ b/tests/parallel_test/run_case.sh @@ -79,7 +79,7 @@ md5sum /home/TDinternal/debug/build/lib/libtaos.so #define taospy 2.7.10 pip3 list|grep taospy pip3 uninstall taospy -y -pip3 install --default-timeout=120 taospy==2.7.10 +pip3 install --default-timeout=120 taospy==2.7.11 $TIMEOUT_CMD $cmd RET=$? diff --git a/tests/system-test/7-tmq/tmqSeekAndCommit.py b/tests/system-test/7-tmq/tmqSeekAndCommit.py index 2d837ef7a4..253edcd10d 100644 --- a/tests/system-test/7-tmq/tmqSeekAndCommit.py +++ b/tests/system-test/7-tmq/tmqSeekAndCommit.py @@ -21,6 +21,7 @@ class TDTestCase: self.db_name = "tmq_db" self.topic_name = "tmq_topic" self.stable_name = "tmqst" + self.prepareData() def prepareData(self): @@ -73,8 +74,9 @@ class TDTestCase: return consumer def test_seek_and_committed_position_with_autocommit(self): + """Check the position and committed offset of the topic for autocommit scenario + """ try: - self.prepareData() inputDict = { "topic_name": self.topic_name, "group_id": "1", @@ -95,18 +97,26 @@ class TDTestCase: partitions = consumer.assignment() position_partitions = consumer.position(partitions) - tdLog.info("position_partitions: %s"%(position_partitions)) for i in range(len(position_partitions)): tdLog.info("position_partitions[%s].offset: %s"%(i, position_partitions[i].offset)) committed_partitions = consumer.committed(partitions) - tdLog.info("committed_partitions: %s"%(committed_partitions)) + origin_committed_position = [] for i in range(len(committed_partitions)): tdLog.info("committed_partitions[%s].offset: %s"%(i, committed_partitions[i].offset)) + origin_committed_position.append(committed_partitions[i].offset) assert(len(position_partitions) == len(committed_partitions)) for i in range(len(position_partitions)): assert(position_partitions[i].offset == committed_partitions[i].offset) - # seek to the beginning of the topic - + # seek to the specified offset of the topic, then check position and committed offset + for partition in partitions: + partition.offset = 5 + consumer.seek(partition) + position_partitions = consumer.position(partitions) + for i in range(len(position_partitions)): + assert(position_partitions[i].offset == 5) + committed_partitions = consumer.committed(partitions) + for i in range(len(committed_partitions)): + assert(committed_partitions[i].offset != 5 and committed_partitions[i].offset == origin_committed_position[i]) except Exception as ex: raise Exception("Failed to test seek and committed position with autocommit with error: {}".format(str(ex))) finally: @@ -114,12 +124,81 @@ class TDTestCase: consumer.close() def test_commit_by_offset(self): - pass - + """Check the position and committed offset of the topic for commit by offset scenario + """ + try: + inputDict = { + "topic_name": self.topic_name, + "group_id": "1", + "auto_commit": "false", + "offset_reset": "earliest" + } + consumer = self.tmqSubscribe(inputDict) + origin_committed_position = [] + while(True): + res = consumer.poll(1) + if not res: + break + err = res.error() + if err is not None: + raise err + partitions = consumer.assignment() + consumer.commit(offsets=partitions) + val = res.value() + for block in val: + tdLog.info("block.fetchall() number: %s"%(len(block.fetchall()))) + position_partitions = consumer.position(partitions) + committed_partitions = consumer.committed(partitions) + for i in range(len(position_partitions)): + assert(position_partitions[i].offset == committed_partitions[i].offset) + committed_partitions = consumer.committed(partitions) + for i in range(len(committed_partitions)): + origin_committed_position.append(committed_partitions[i].offset) + tdLog.info("original committed_partitions[%s].offset: %s"%(i, committed_partitions[i].offset)) + # seek to the specified offset of the topic, then check position and committed offset + for partition in partitions: + partition.offset = 2 + consumer.seek(partition) + position_partitions = consumer.position(partitions) + for i in range(len(position_partitions)): + assert(position_partitions[i].offset == 2) + committed_partitions = consumer.committed(partitions) + for i in range(len(committed_partitions)): + tdLog.info("after seek committed_partitions[%s].offset: %s"%(i, committed_partitions[i].offset)) + assert(committed_partitions[i].offset != 2 and committed_partitions[i].offset == origin_committed_position[i]) + # continue to consume data from seek offset + while(True): + res = consumer.poll(1) + if not res: + break + err = res.error() + if err is not None: + raise err + partitions = consumer.assignment() + # commit by offset + consumer.commit(offsets=partitions) + val = res.value() + for block in val: + tdLog.info("block.fetchall() number: %s"%(len(block.fetchall()))) + partitions = consumer.assignment() + position_partitions = consumer.position(partitions) + committed_partitions = consumer.committed(partitions) + assert(len(position_partitions) == len(committed_partitions)) + for i in range(len(position_partitions)): + assert(position_partitions[i].offset == committed_partitions[i].offset) + except Exception as ex: + raise Exception("Failed to test commit by offset with error: {}".format(str(ex))) + finally: + consumer.unsubscribe() + consumer.close() + def run(self): self.test_seek_and_committed_position_with_autocommit() + self.test_commit_by_offset() def stop(self): + tdSql.execute("drop topic %s" % self.topic_name) + tdSql.execute("drop database %s"%(self.db_name)) tdSql.close() tdLog.success(f"{__file__} successfully executed")