From 14531fbf7bd699397e70c0016736fb01a6c2673e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 11 Apr 2024 16:20:28 +0800 Subject: [PATCH] fix:filter data error & add test case --- source/common/src/tmsg.c | 10 +- source/dnode/mnode/impl/src/mndSubscribe.c | 2 +- source/libs/executor/src/executor.c | 1 - source/libs/executor/src/scanoperator.c | 4 +- tests/system-test/7-tmq/tmq_primary_key.py | 546 +++++++-------------- 5 files changed, 184 insertions(+), 379 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 5ad5fef1f5..2705c6d5dc 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -8730,7 +8730,15 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { } else if (pVal->type == TMQ_OFFSET__LOG) { snprintf(buf, maxLen, "wal:%" PRId64, pVal->version); } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) { - snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64, pVal->uid, pVal->ts); + if(IS_VAR_DATA_TYPE(pVal->primaryKey.type)) { + char *tmp = taosMemoryCalloc(1, pVal->primaryKey.nData + 1); + if (tmp == NULL) return TSDB_CODE_OUT_OF_MEMORY; + memcpy(tmp, pVal->primaryKey.pData, pVal->primaryKey.nData); + snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%s", pVal->uid, pVal->ts, pVal->primaryKey.type, tmp); + taosMemoryFree(tmp); + }else{ + snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%" PRId64, pVal->uid, pVal->ts, pVal->primaryKey.type, pVal->primaryKey.val); + } } else { return TSDB_CODE_INVALID_PARA; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index a6d7a24323..c875deb972 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1311,6 +1311,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons for(int i = 0; i < taosArrayGetSize(offsetRows); i++){ OffsetRows *tmp = taosArrayGet(offsetRows, i); if(tmp->vgId != pVgEp->vgId){ + mError("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId); continue; } data = tmp; @@ -1374,7 +1375,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows); } - // do not show for cleared subscription buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows); pBlock->info.rows = numOfRows; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2364c84aaf..f3da768eb9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1256,7 +1256,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } } else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB - if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3bf62ed933..58ef2a1bd3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2097,10 +2097,10 @@ static void doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal *offse void *tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE); memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData); varDataLen(tmq) = offset->primaryKey.nData; - p[i] = (*ts >= offset->ts) && (func(data, tmq) > 0); + p[i] = (*ts > offset->ts) || (func(data, tmq) > 0); taosMemoryFree(tmq); }else{ - p[i] = (*ts >= offset->ts) && (func(data, &offset->primaryKey.val) > 0); + p[i] = (*ts > offset->ts) || (func(data, &offset->primaryKey.val) > 0); } if (!p[i]) { diff --git a/tests/system-test/7-tmq/tmq_primary_key.py b/tests/system-test/7-tmq/tmq_primary_key.py index 27a453eac6..a14442bbf8 100644 --- a/tests/system-test/7-tmq/tmq_primary_key.py +++ b/tests/system-test/7-tmq/tmq_primary_key.py @@ -1,4 +1,3 @@ - import taos import sys import time @@ -12,372 +11,50 @@ from util.cases import * from util.dnodes import * from util.common import * from taos.tmq import * +from util.dnodes import * +import datetime + sys.path.append("./7-tmq") from tmqCommon import * + class TDTestCase: + clientCfgDict = {'debugFlag': 135} + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + updatecfgDict["clientCfg"] = clientCfgDict + def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) - #tdSql.init(conn.cursor(), logSql) # output sql.txt file - def checkJson(self, cfgPath, name): - srcFile = '%s/../log/%s.source'%(cfgPath, name) - dstFile = '%s/../log/%s.result'%(cfgPath, name) - tdLog.info("compare file: %s, %s"%(srcFile, dstFile)) + def primaryKeyTestInt(self): + print("==============Case 1: primary key test int") + tdSql.execute(f'create database if not exists abc1 vgroups 1 wal_retention_period 3600;') + tdSql.execute(f'use abc1;') + tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);') + tdSql.execute(f'insert into pk values(1669092069068, 0, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 6, 1);') + tdSql.execute(f'flush database abc1') - consumeFile = open(srcFile, mode='r') - queryFile = open(dstFile, mode='r') + tdSql.execute(f'insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1);') + tdSql.execute(f'insert into pk values(1669092069068, 10, 1) (1669092069068, 16, 1);') - while True: - dst = queryFile.readline() - src = consumeFile.readline() - if src: - if dst != src: - tdLog.exit("compare error: %s != %s"%(src, dst)) - else: - break - return + tdSql.execute(f'create topic topic_in with meta as database abc1') - def checkDropData(self, drop): - tdSql.execute('use db_taosx') - tdSql.query("show tables") - if drop: - tdSql.checkRows(11) - else: - tdSql.checkRows(16) - tdSql.query("select * from jt order by i") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 1) - tdSql.checkData(1, 1, 11) - tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}') - tdSql.checkData(1, 2, None) - - tdSql.query("select * from sttb order by ts") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 13) - tdSql.checkData(1, 1, 16) - tdSql.checkData(0, 2, 22) - tdSql.checkData(1, 2, 25) - tdSql.checkData(0, 5, "sttb3") - tdSql.checkData(1, 5, "sttb4") - - tdSql.query("select * from stt order by ts") - tdSql.checkRows(3) - tdSql.checkData(0, 1, 1) - tdSql.checkData(2, 1, 21) - tdSql.checkData(0, 2, 2) - tdSql.checkData(2, 2, 21) - tdSql.checkData(0, 5, "stt3") - tdSql.checkData(2, 5, "stt4") - - tdSql.execute('use abc1') - tdSql.query("show tables") - if drop: - tdSql.checkRows(11) - else: - tdSql.checkRows(16) - tdSql.query("select * from jt order by i") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 1) - tdSql.checkData(1, 1, 11) - tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}') - tdSql.checkData(1, 2, None) - - tdSql.query("select * from sttb order by ts") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 13) - tdSql.checkData(1, 1, 16) - tdSql.checkData(0, 2, 22) - tdSql.checkData(1, 2, 25) - tdSql.checkData(0, 5, "sttb3") - tdSql.checkData(1, 5, "sttb4") - - tdSql.query("select * from stt order by ts") - tdSql.checkRows(3) - tdSql.checkData(0, 1, 1) - tdSql.checkData(2, 1, 21) - tdSql.checkData(0, 2, 2) - tdSql.checkData(2, 2, 21) - tdSql.checkData(0, 5, "stt3") - tdSql.checkData(2, 5, "stt4") - - return - - def checkDataTable(self): - tdSql.execute('use db_taosx') - tdSql.query("select * from meters_summary") - tdSql.checkRows(1) - tdSql.checkData(0, 1, 120) - tdSql.checkData(0, 2, 1) - tdSql.checkData(0, 3, "San Francisco") - - tdSql.execute('use abc1') - tdSql.query("select * from meters_summary") - tdSql.checkRows(1) - tdSql.checkData(0, 1, 120) - tdSql.checkData(0, 2, 1) - tdSql.checkData(0, 3, "San Francisco") - - return - - def checkData(self): - tdSql.execute('use db_taosx') - tdSql.query("select * from tb1") - tdSql.checkRows(1) - tdSql.checkData(0, 1, 0) - tdSql.checkData(0, 2, 1) - - tdSql.query("select * from ct3 order by c1 desc") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 51) - tdSql.checkData(0, 4, 940) - tdSql.checkData(1, 1, 23) - tdSql.checkData(1, 4, None) - - tdSql.query("select * from st1 order by ts") - tdSql.checkRows(8) - tdSql.checkData(0, 1, 1) - tdSql.checkData(1, 1, 3) - tdSql.checkData(4, 1, 4) - tdSql.checkData(6, 1, 23) - - tdSql.checkData(0, 2, 2) - tdSql.checkData(1, 2, 4) - tdSql.checkData(4, 2, 3) - tdSql.checkData(6, 2, 32) - - tdSql.checkData(0, 3, 'a') - tdSql.checkData(1, 3, 'b') - tdSql.checkData(4, 3, 'hwj') - tdSql.checkData(6, 3, 's21ds') - - tdSql.checkData(0, 4, None) - tdSql.checkData(1, 4, None) - tdSql.checkData(5, 4, 940) - tdSql.checkData(6, 4, None) - - tdSql.checkData(0, 5, 1000) - tdSql.checkData(1, 5, 2000) - tdSql.checkData(4, 5, 1000) - tdSql.checkData(6, 5, 5000) - - tdSql.checkData(0, 6, 'ttt') - tdSql.checkData(1, 6, None) - tdSql.checkData(4, 6, 'ttt') - tdSql.checkData(6, 6, None) - - tdSql.checkData(0, 7, True) - tdSql.checkData(1, 7, None) - tdSql.checkData(4, 7, True) - tdSql.checkData(6, 7, None) - - tdSql.checkData(0, 8, None) - tdSql.checkData(1, 8, None) - tdSql.checkData(4, 8, None) - tdSql.checkData(6, 8, None) - - tdSql.query("select * from ct1") - tdSql.checkRows(4) - - tdSql.query("select * from ct2") - tdSql.checkRows(0) - - tdSql.query("select * from ct0 order by c1") - tdSql.checkRows(2) - tdSql.checkData(0, 3, "a") - tdSql.checkData(1, 4, None) - - tdSql.query("select * from n1 order by cc3 desc") - tdSql.checkRows(2) - tdSql.checkData(0, 1, "eeee") - tdSql.checkData(1, 2, 940) - - tdSql.query("select * from jt order by i desc") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 11) - tdSql.checkData(0, 2, None) - tdSql.checkData(1, 1, 1) - tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}') - - time.sleep(10) - tdSql.query("select * from information_schema.ins_tables where table_name = 'stt4'") - uid1 = tdSql.getData(0, 5) - uid2 = tdSql.getData(1, 5) - tdSql.checkNotEqual(uid1, uid2) - return - - def checkWal1Vgroup(self): - buildPath = tdCom.getBuildPath() - cfgPath = tdCom.getClientCfgPath() - cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1'%(buildPath, cfgPath) - tdLog.info(cmdStr) - os.system(cmdStr) - - self.checkJson(cfgPath, "tmq_taosx_tmp") - self.checkData() - self.checkDropData(False) - - return - - def checkWal1VgroupOnlyMeta(self): - buildPath = tdCom.getBuildPath() - cfgPath = tdCom.getClientCfgPath() - cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -d -onlymeta'%(buildPath, cfgPath) - tdLog.info(cmdStr) - os.system(cmdStr) - - self.checkJson(cfgPath, "tmq_taosx_tmp") - - return - - def checkWal1VgroupTable(self): - buildPath = tdCom.getBuildPath() - cfgPath = tdCom.getClientCfgPath() - cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -t'%(buildPath, cfgPath) - tdLog.info(cmdStr) - os.system(cmdStr) - - self.checkJson(cfgPath, "tmq_taosx_tmp") - self.checkDataTable() - - return - - def checkWalMultiVgroups(self): - buildPath = tdCom.getBuildPath() - cmdStr = '%s/build/bin/tmq_taosx_ci -sv 3 -dv 5'%(buildPath) - tdLog.info(cmdStr) - os.system(cmdStr) - - self.checkData() - self.checkDropData(False) - - return - - def checkWalMultiVgroupsWithDropTable(self): - buildPath = tdCom.getBuildPath() - cmdStr = '%s/build/bin/tmq_taosx_ci -sv 3 -dv 5 -d'%(buildPath) - tdLog.info(cmdStr) - os.system(cmdStr) - - self.checkDropData(True) - - return - - def checkSnapshot1Vgroup(self): - buildPath = tdCom.getBuildPath() - cfgPath = tdCom.getClientCfgPath() - cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s'%(buildPath, cfgPath) - tdLog.info(cmdStr) - os.system(cmdStr) - - self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot") - self.checkData() - self.checkDropData(False) - - return - - def checkSnapshot1VgroupTable(self): - buildPath = tdCom.getBuildPath() - cfgPath = tdCom.getClientCfgPath() - cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s -t'%(buildPath, cfgPath) - tdLog.info(cmdStr) - os.system(cmdStr) - - self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot") - self.checkDataTable() - - return - - def checkSnapshotMultiVgroups(self): - buildPath = tdCom.getBuildPath() - cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s'%(buildPath) - tdLog.info(cmdStr) - os.system(cmdStr) - - self.checkData() - self.checkDropData(False) - - return - - def checkSnapshotMultiVgroupsWithDropTable(self): - buildPath = tdCom.getBuildPath() - cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s -d'%(buildPath) - tdLog.info(cmdStr) - os.system(cmdStr) - - self.checkDropData(True) - - return - - def primaryKeyTest(self): - tdSql.execute(f'create database if not exists d1') - tdSql.execute(f'use d1') - tdSql.execute(f'create table st(ts timestamp, i int primary key, j varchar(8)) tags(t int)') - tdSql.execute(f'insert into t1 using st tags(1) values(1711098732000, 1) (1711098732000, 2)') - tdSql.execute(f'insert into t2 using st tags(2) values(1711098732001, -2) (1711098732001, -1)') - tdSql.execute(f'insert into t2 using st tags(2) values(1711098732001, 1) (1711098732001, 2)') - tdSql.execute(f'insert into t3 using st tags(3) values(1711098732002, 11) (1711098732002, 12)') - - tdSql.query("select * from st") - tdSql.checkRows(8) - - tdSql.execute(f'create topic topic_all with meta as database d1') consumer_dict = { "group.id": "g1", "td.connect.user": "root", "td.connect.pass": "taosdata", "auto.offset.reset": "earliest", + "enable.auto.commit": "false", "experimental.snapshot.enable": "true", } consumer = Consumer(consumer_dict) try: - consumer.subscribe(["topic_all"]) - except TmqError: - tdLog.exit(f"subscribe error") - - index = 0 - try: - while True: - res = consumer.poll(1) - if not res: - if index != 1: - tdLog.exit("consume error") - break - val = res.value() - if val is None: - continue - cnt = 0; - for block in val: - cnt += len(block.fetchall()) - - if cnt != 8: - tdLog.exit("consume error") - - index += 1 - finally: - consumer.close() - - def consume_TS_4540_Test(self): - tdSql.execute(f'create database if not exists test') - tdSql.execute(f'use test') - tdSql.execute(f'CREATE STABLE `test`.`b` ( `time` TIMESTAMP , `task_id` NCHAR(1000) ) TAGS( `key` NCHAR(1000))') - tdSql.execute(f"insert into `test`.b1 using `test`.`b`(`key`) tags('1') (time, task_id) values ('2024-03-04 12:50:01.000', '32') `test`.b2 using `test`.`b`(`key`) tags('2') (time, task_id) values ('2024-03-04 12:50:01.000', '43') `test`.b3 using `test`.`b`(`key`) tags('3') (time, task_id) values ('2024-03-04 12:50:01.000', '123456')") - - tdSql.execute(f'create topic tt as select tbname,task_id,`key` from b') - - consumer_dict = { - "group.id": "g1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "earliest", - } - consumer = Consumer(consumer_dict) - - try: - consumer.subscribe(["tt"]) + consumer.subscribe(["topic_in"]) except TmqError: tdLog.exit(f"subscribe error") @@ -391,30 +68,33 @@ class TDTestCase: continue for block in val: data = block.fetchall() - print(data) - if data != [('b1', '32', '1')] and data != [('b2', '43', '2')] and data != [('b3', '123456', '3')]: - tdLog.exit(f"index = 0 table b1 error") + for element in data: + print(element) + if len(data) != 2: + tdLog.exit(f"fetchall len != 2") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 6, 1)]: + tdLog.exit(f"data error") + consumer.commit(res) + break finally: consumer.close() - - def consume_ts_4544(self): - tdSql.execute(f'create database if not exists d1') - tdSql.execute(f'use d1') - tdSql.execute(f'create table stt(ts timestamp, i int) tags(t int)') - tdSql.execute(f'insert into tt1 using stt tags(1) values(now, 1) (now+1s, 2)') - tdSql.execute(f'insert into tt2 using stt tags(2) values(now, 1) (now+1s, 2)') - tdSql.execute(f'insert into tt3 using stt tags(3) values(now, 1) (now+1s, 2)') - tdSql.execute(f'insert into tt1 using stt tags(1) values(now+5s, 11) (now+10s, 12)') - tdSql.execute(f'create topic topic_in as select * from stt where tbname in ("tt2")') + tdSql.query(f'show subscriptions;') + sub = tdSql.getData(0, 4); + print(sub) + if not sub.startswith("tsdb"): + tdLog.exit(f"show subscriptions error") + + tdSql.execute(f'use abc1;') + tdSql.execute(f'insert into pk values(1669092069069, 10, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 5, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 12, 1);') + tdSql.execute(f'insert into pk values(1669092069069, 7, 1);') + + tdSql.execute(f'flush database abc1') - consumer_dict = { - "group.id": "g1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "earliest", - } consumer = Consumer(consumer_dict) try: @@ -422,22 +102,66 @@ class TDTestCase: except TmqError: tdLog.exit(f"subscribe error") - consumer.close() + index = 0 + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if index == 0: + if len(data) != 6: + tdLog.exit(f"fetchall len != 6") + if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1), + (datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1), + (datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]: + tdLog.exit(f"data error") + if index == 1: + if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \ + and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]: + tdLog.exit(f"data error") + index += 1 + finally: + consumer.close() - def consume_ts_4551(self): - tdSql.execute(f'use d1') + def primaryKeyTestString(self): + print("==============Case 2: primary key test string") + tdSql.execute(f'create database if not exists db_pk_string vgroups 1 wal_retention_period 3600;') + tdSql.execute(f'use db_pk_string;') + tdSql.execute(f'create table if not exists pk (ts timestamp, c1 varchar(64) primary key, c2 int);') + tdSql.execute(f'insert into pk values(1669092069068, "hello", 1);') + tdSql.execute(f'insert into pk values(1669092069068, "word", 1);') + tdSql.execute(f'flush database db_pk_string') + + tdSql.execute(f'insert into pk values(1669092069069, "him", 1) (1669092069069, "value", 1);') + tdSql.execute(f'insert into pk values(1669092069069, "she", 1) (1669092069069, "like", 1);') + tdSql.execute(f'insert into pk values(1669092069068, "from", 1) (1669092069068, "it", 1);') + + tdSql.execute(f'create topic topic_pk_string with meta as database db_pk_string') - tdSql.execute(f'create topic topic_stable as stable stt where tbname like "t%"') consumer_dict = { "group.id": "g1", "td.connect.user": "root", "td.connect.pass": "taosdata", "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + "experimental.snapshot.enable": "true", } consumer = Consumer(consumer_dict) try: - consumer.subscribe(["topic_stable"]) + consumer.subscribe(["topic_pk_string"]) except TmqError: tdLog.exit(f"subscribe error") @@ -446,19 +170,93 @@ class TDTestCase: res = consumer.poll(1) if not res: break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if len(data) != 2: + tdLog.exit(f"fetchall len != 2") + # if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), "hello", 1), + # (datetime(2022, 11, 22, 12, 41, 9, 68000), "world", 1)]: + # tdLog.exit(f"data error") + + consumer.commit(res) + break + finally: + consumer.close() + + tdSql.query(f'show subscriptions;') + sub = tdSql.getData(0, 4); + print(sub) + if not sub.startswith("tsdb"): + tdLog.exit(f"show subscriptions error") + + tdDnodes.stop(1) + time.sleep(2) + tdDnodes.start(1) + + tdSql.execute(f'use db_pk_string;') + tdSql.execute(f'insert into pk values(1669092069069, "10", 1);') + tdSql.execute(f'insert into pk values(1669092069069, "5", 1);') + tdSql.execute(f'insert into pk values(1669092069069, "12", 1);') + tdSql.execute(f'insert into pk values(1669092069069, "7", 1);') + + tdSql.execute(f'flush database db_pk_string') + + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_string"]) + except TmqError: + tdLog.exit(f"subscribe error") + + index = 0 + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + for element in data: + print(element) + if index == 0: + if len(data) != 6: + tdLog.exit(f"fetchall len != 6") + # if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1), + # (datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1), + # (datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1), + # (datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1), + # (datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1), + # (datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]: + # tdLog.exit(f"data error") + if index == 1: + if len(data) != 4: + tdLog.exit(f"fetchall len != 4") + # if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1), + # (datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1), + # (datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1), + # (datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]: + # tdLog.exit(f"data error") + + index += 1 finally: consumer.close() - print("consume_ts_4551 ok") def run(self): - self.consumeTest() - self.consume_ts_4544() - self.consume_ts_4551() - self.consume_TS_4540_Test() + # self.primaryKeyTestInt() + self.primaryKeyTestString() def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") + tdCases.addLinux(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())