diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 40f3cf3971..b174414432 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -80,7 +80,7 @@ int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, S } int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) { - const int32_t MAX_ROWS_TO_RETURN = 4096; + const int32_t MAX_ROWS_TO_RETURN = 1; int32_t vgId = TD_VID(pTq->pVnode); int32_t code = 0; diff --git a/tests/system-test/7-tmq/tmq_primary_key.py b/tests/system-test/7-tmq/tmq_primary_key.py index a14442bbf8..1ab83a9eaf 100644 --- a/tests/system-test/7-tmq/tmq_primary_key.py +++ b/tests/system-test/7-tmq/tmq_primary_key.py @@ -28,8 +28,225 @@ class TDTestCase: tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) + def primaryKeyTestIntQuery(self): + print("==============Case 1: primary key test int for query") + tdSql.execute(f'create database if not exists db_pk_query vgroups 1 wal_retention_period 3600;') + tdSql.execute(f'use db_pk_query;') + tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);') + tdSql.execute(f'insert into pk values(1669092069068, 0, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 6, 1);') + tdSql.execute(f'flush database db_pk_query') + + tdSql.execute(f'insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 10, 1) (1669092069068, 16, 1);') + + tdSql.execute(f'create topic topic_pk_query as select * from pk') + + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + "experimental.snapshot.enable": "true", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_query"]) + except TmqError: + tdLog.exit(f"subscribe error") + + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if len(data) != 2: + tdLog.exit(f"fetchall len != 2") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 6, 1)]: + tdLog.exit(f"data error") + + consumer.commit(res) + break + finally: + consumer.close() + + tdSql.query(f'show subscriptions;') + sub = tdSql.getData(0, 4); + print(sub) + if not sub.startswith("tsdb"): + tdLog.exit(f"show subscriptions error") + + tdSql.execute(f'use db_pk_query;') + tdSql.execute(f'insert into pk values(1669092069069, 10, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 5, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 12, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 7, 1);') + + tdSql.execute(f'flush database db_pk_query') + + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_query"]) + except TmqError: + tdLog.exit(f"subscribe error") + + index = 0 + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if index == 0: + if len(data) != 6: + tdLog.exit(f"fetchall len != 6") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]: + tdLog.exit(f"data error") + if index >= 1: + if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]: + tdLog.exit(f"data error") + index += 1 + print("index:" + str(index)) + finally: + consumer.close() + + def primaryKeyTestIntStable(self): + print("==============Case 1: primary key test int for stable") + tdSql.execute(f'create database if not exists db_pk_stable vgroups 1 wal_retention_period 3600;') + tdSql.execute(f'use db_pk_stable;') + tdSql.execute(f'create table if not exists pks (ts timestamp, c1 int primary key, c2 int) tags (t int);') + tdSql.execute(f'create table if not exists pk using pks tags(1);') + tdSql.execute(f'insert into pk values(1669092069068, 0, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 6, 1);') + tdSql.execute(f'flush database db_pk_stable') + + tdSql.execute(f'insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 10, 1) (1669092069068, 16, 1);') + + tdSql.execute(f'create topic topic_pk_stable as stable pks') + + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + "experimental.snapshot.enable": "true", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_stable"]) + except TmqError: + tdLog.exit(f"subscribe error") + + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if len(data) != 2: + tdLog.exit(f"fetchall len != 2") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 6, 1)]: + tdLog.exit(f"data error") + + consumer.commit(res) + break + finally: + consumer.close() + + tdSql.query(f'show subscriptions;') + sub = tdSql.getData(0, 4); + print(sub) + if not sub.startswith("tsdb"): + tdLog.exit(f"show subscriptions error") + + tdSql.execute(f'use db_pk_stable;') + tdSql.execute(f'insert into pk values(1669092069069, 10, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 5, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 12, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 7, 1);') + + tdSql.execute(f'flush database db_pk_stable') + + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_stable"]) + except TmqError: + tdLog.exit(f"subscribe error") + + index = 0 + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if index == 0: + if len(data) != 6: + tdLog.exit(f"fetchall len != 6") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]: + tdLog.exit(f"data error") + if index >= 1: + if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]: + tdLog.exit(f"data error") + index += 1 + print("index:" + str(index)) + finally: + consumer.close() + def primaryKeyTestInt(self): - print("==============Case 1: primary key test int") + print("==============Case 1: primary key test int for db") tdSql.execute(f'create database if not exists abc1 vgroups 1 wal_retention_period 3600;') tdSql.execute(f'use abc1;') tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);') @@ -125,23 +342,24 @@ class TDTestCase: (datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1), (datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]: tdLog.exit(f"data error") - if index == 1: + if index >= 1: if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \ and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \ and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \ and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]: tdLog.exit(f"data error") index += 1 + print("index:" + str(index)) finally: consumer.close() def primaryKeyTestString(self): - print("==============Case 2: primary key test string") + print("==============Case 2: primary key test string for db") tdSql.execute(f'create database if not exists db_pk_string vgroups 1 wal_retention_period 3600;') tdSql.execute(f'use db_pk_string;') tdSql.execute(f'create table if not exists pk (ts timestamp, c1 varchar(64) primary key, c2 int);') - tdSql.execute(f'insert into pk values(1669092069068, "hello", 1);') - tdSql.execute(f'insert into pk values(1669092069068, "word", 1);') + tdSql.execute(f'insert into pk values(1669092069068, "ahello", 1);') + tdSql.execute(f'insert into pk values(1669092069068, "aworld", 1);') tdSql.execute(f'flush database db_pk_string') tdSql.execute(f'insert into pk values(1669092069069, "him", 1) (1669092069069, "value", 1);') @@ -179,9 +397,9 @@ class TDTestCase: print(element) if len(data) != 2: tdLog.exit(f"fetchall len != 2") - # if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), "hello", 1), - # (datetime(2022, 11, 22, 12, 41, 9, 68000), "world", 1)]: - # tdLog.exit(f"data error") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 'ahello', 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 'aworld', 1)]: + tdLog.exit(f"data error") consumer.commit(res) break @@ -207,19 +425,23 @@ class TDTestCase: tdSql.execute(f'flush database db_pk_string') consumer = Consumer(consumer_dict) - + print(1) try: consumer.subscribe(["topic_pk_string"]) except TmqError: tdLog.exit(f"subscribe error") - + time.sleep(5) index = 0 try: while True: + print(consumer) res = consumer.poll(1) + print(res) if not res: + print(11323) break val = res.value() + print(12) if val is None: continue for block in val: @@ -229,27 +451,28 @@ class TDTestCase: if index == 0: if len(data) != 6: tdLog.exit(f"fetchall len != 6") - # if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1), - # (datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1), - # (datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1), - # (datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1), - # (datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1), - # (datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]: - # tdLog.exit(f"data error") - if index == 1: - if len(data) != 4: - tdLog.exit(f"fetchall len != 4") - # if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1), - # (datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1), - # (datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1), - # (datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]: - # tdLog.exit(f"data error") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 'from', 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 'it', 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 'him', 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 'like', 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 'she', 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 'value', 1)]: + tdLog.exit(f"data error") + if index >= 1: + if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "10", 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "5", 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "12", 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "7", 1)]: + tdLog.exit(f"data error") index += 1 + print("index:" + str(index)) finally: consumer.close() def run(self): + # self.primaryKeyTestIntQuery() + # self.primaryKeyTestIntStable() # self.primaryKeyTestInt() self.primaryKeyTestString()