fix:filter data error & add test case

This commit is contained in:
wangmm0220 2024-04-11 16:20:28 +08:00
parent 539d1f441f
commit 14531fbf7b
5 changed files with 184 additions and 379 deletions

View File

@ -8730,7 +8730,15 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
} else if (pVal->type == TMQ_OFFSET__LOG) { } else if (pVal->type == TMQ_OFFSET__LOG) {
snprintf(buf, maxLen, "wal:%" PRId64, pVal->version); snprintf(buf, maxLen, "wal:%" PRId64, pVal->version);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64, pVal->uid, pVal->ts); if(IS_VAR_DATA_TYPE(pVal->primaryKey.type)) {
char *tmp = taosMemoryCalloc(1, pVal->primaryKey.nData + 1);
if (tmp == NULL) return TSDB_CODE_OUT_OF_MEMORY;
memcpy(tmp, pVal->primaryKey.pData, pVal->primaryKey.nData);
snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%s", pVal->uid, pVal->ts, pVal->primaryKey.type, tmp);
taosMemoryFree(tmp);
}else{
snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%" PRId64, pVal->uid, pVal->ts, pVal->primaryKey.type, pVal->primaryKey.val);
}
} else { } else {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }

View File

@ -1311,6 +1311,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons
for(int i = 0; i < taosArrayGetSize(offsetRows); i++){ for(int i = 0; i < taosArrayGetSize(offsetRows); i++){
OffsetRows *tmp = taosArrayGet(offsetRows, i); OffsetRows *tmp = taosArrayGet(offsetRows, i);
if(tmp->vgId != pVgEp->vgId){ if(tmp->vgId != pVgEp->vgId){
mError("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
continue; continue;
} }
data = tmp; data = tmp;
@ -1374,7 +1375,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows); buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows);
} }
// do not show for cleared subscription
buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows); buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows);
pBlock->info.rows = numOfRows; pBlock->info.rows = numOfRows;

View File

@ -1256,7 +1256,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} }
} else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB } else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;

View File

@ -2097,10 +2097,10 @@ static void doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal *offse
void *tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE); void *tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE);
memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData); memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData);
varDataLen(tmq) = offset->primaryKey.nData; varDataLen(tmq) = offset->primaryKey.nData;
p[i] = (*ts >= offset->ts) && (func(data, tmq) > 0); p[i] = (*ts > offset->ts) || (func(data, tmq) > 0);
taosMemoryFree(tmq); taosMemoryFree(tmq);
}else{ }else{
p[i] = (*ts >= offset->ts) && (func(data, &offset->primaryKey.val) > 0); p[i] = (*ts > offset->ts) || (func(data, &offset->primaryKey.val) > 0);
} }
if (!p[i]) { if (!p[i]) {

View File

@ -1,4 +1,3 @@
import taos import taos
import sys import sys
import time import time
@ -12,372 +11,50 @@ from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.common import * from util.common import *
from taos.tmq import * from taos.tmq import *
from util.dnodes import *
import datetime
sys.path.append("./7-tmq") sys.path.append("./7-tmq")
from tmqCommon import * from tmqCommon import *
class TDTestCase: class TDTestCase:
clientCfgDict = {'debugFlag': 135}
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
updatecfgDict["clientCfg"] = clientCfgDict
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkJson(self, cfgPath, name): def primaryKeyTestInt(self):
srcFile = '%s/../log/%s.source'%(cfgPath, name) print("==============Case 1: primary key test int")
dstFile = '%s/../log/%s.result'%(cfgPath, name) tdSql.execute(f'create database if not exists abc1 vgroups 1 wal_retention_period 3600;')
tdLog.info("compare file: %s, %s"%(srcFile, dstFile)) tdSql.execute(f'use abc1;')
tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);')
tdSql.execute(f'insert into pk values(1669092069068, 0, 1);')
tdSql.execute(f'insert into pk values(1669092069068, 6, 1);')
tdSql.execute(f'flush database abc1')
consumeFile = open(srcFile, mode='r') tdSql.execute(f'insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1);')
queryFile = open(dstFile, mode='r') tdSql.execute(f'insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1);')
tdSql.execute(f'insert into pk values(1669092069068, 10, 1) (1669092069068, 16, 1);')
while True: tdSql.execute(f'create topic topic_in with meta as database abc1')
dst = queryFile.readline()
src = consumeFile.readline()
if src:
if dst != src:
tdLog.exit("compare error: %s != %s"%(src, dst))
else:
break
return
def checkDropData(self, drop):
tdSql.execute('use db_taosx')
tdSql.query("show tables")
if drop:
tdSql.checkRows(11)
else:
tdSql.checkRows(16)
tdSql.query("select * from jt order by i")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 11)
tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}')
tdSql.checkData(1, 2, None)
tdSql.query("select * from sttb order by ts")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 13)
tdSql.checkData(1, 1, 16)
tdSql.checkData(0, 2, 22)
tdSql.checkData(1, 2, 25)
tdSql.checkData(0, 5, "sttb3")
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 1)
tdSql.checkData(2, 1, 21)
tdSql.checkData(0, 2, 2)
tdSql.checkData(2, 2, 21)
tdSql.checkData(0, 5, "stt3")
tdSql.checkData(2, 5, "stt4")
tdSql.execute('use abc1')
tdSql.query("show tables")
if drop:
tdSql.checkRows(11)
else:
tdSql.checkRows(16)
tdSql.query("select * from jt order by i")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 11)
tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}')
tdSql.checkData(1, 2, None)
tdSql.query("select * from sttb order by ts")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 13)
tdSql.checkData(1, 1, 16)
tdSql.checkData(0, 2, 22)
tdSql.checkData(1, 2, 25)
tdSql.checkData(0, 5, "sttb3")
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 1)
tdSql.checkData(2, 1, 21)
tdSql.checkData(0, 2, 2)
tdSql.checkData(2, 2, 21)
tdSql.checkData(0, 5, "stt3")
tdSql.checkData(2, 5, "stt4")
return
def checkDataTable(self):
tdSql.execute('use db_taosx')
tdSql.query("select * from meters_summary")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 120)
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, "San Francisco")
tdSql.execute('use abc1')
tdSql.query("select * from meters_summary")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 120)
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, "San Francisco")
return
def checkData(self):
tdSql.execute('use db_taosx')
tdSql.query("select * from tb1")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 0)
tdSql.checkData(0, 2, 1)
tdSql.query("select * from ct3 order by c1 desc")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 51)
tdSql.checkData(0, 4, 940)
tdSql.checkData(1, 1, 23)
tdSql.checkData(1, 4, None)
tdSql.query("select * from st1 order by ts")
tdSql.checkRows(8)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 3)
tdSql.checkData(4, 1, 4)
tdSql.checkData(6, 1, 23)
tdSql.checkData(0, 2, 2)
tdSql.checkData(1, 2, 4)
tdSql.checkData(4, 2, 3)
tdSql.checkData(6, 2, 32)
tdSql.checkData(0, 3, 'a')
tdSql.checkData(1, 3, 'b')
tdSql.checkData(4, 3, 'hwj')
tdSql.checkData(6, 3, 's21ds')
tdSql.checkData(0, 4, None)
tdSql.checkData(1, 4, None)
tdSql.checkData(5, 4, 940)
tdSql.checkData(6, 4, None)
tdSql.checkData(0, 5, 1000)
tdSql.checkData(1, 5, 2000)
tdSql.checkData(4, 5, 1000)
tdSql.checkData(6, 5, 5000)
tdSql.checkData(0, 6, 'ttt')
tdSql.checkData(1, 6, None)
tdSql.checkData(4, 6, 'ttt')
tdSql.checkData(6, 6, None)
tdSql.checkData(0, 7, True)
tdSql.checkData(1, 7, None)
tdSql.checkData(4, 7, True)
tdSql.checkData(6, 7, None)
tdSql.checkData(0, 8, None)
tdSql.checkData(1, 8, None)
tdSql.checkData(4, 8, None)
tdSql.checkData(6, 8, None)
tdSql.query("select * from ct1")
tdSql.checkRows(4)
tdSql.query("select * from ct2")
tdSql.checkRows(0)
tdSql.query("select * from ct0 order by c1")
tdSql.checkRows(2)
tdSql.checkData(0, 3, "a")
tdSql.checkData(1, 4, None)
tdSql.query("select * from n1 order by cc3 desc")
tdSql.checkRows(2)
tdSql.checkData(0, 1, "eeee")
tdSql.checkData(1, 2, 940)
tdSql.query("select * from jt order by i desc")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 11)
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 1, 1)
tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}')
time.sleep(10)
tdSql.query("select * from information_schema.ins_tables where table_name = 'stt4'")
uid1 = tdSql.getData(0, 5)
uid2 = tdSql.getData(1, 5)
tdSql.checkNotEqual(uid1, uid2)
return
def checkWal1Vgroup(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp")
self.checkData()
self.checkDropData(False)
return
def checkWal1VgroupOnlyMeta(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -d -onlymeta'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp")
return
def checkWal1VgroupTable(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -t'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp")
self.checkDataTable()
return
def checkWalMultiVgroups(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 3 -dv 5'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkData()
self.checkDropData(False)
return
def checkWalMultiVgroupsWithDropTable(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 3 -dv 5 -d'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkDropData(True)
return
def checkSnapshot1Vgroup(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot")
self.checkData()
self.checkDropData(False)
return
def checkSnapshot1VgroupTable(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s -t'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot")
self.checkDataTable()
return
def checkSnapshotMultiVgroups(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkData()
self.checkDropData(False)
return
def checkSnapshotMultiVgroupsWithDropTable(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s -d'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkDropData(True)
return
def primaryKeyTest(self):
tdSql.execute(f'create database if not exists d1')
tdSql.execute(f'use d1')
tdSql.execute(f'create table st(ts timestamp, i int primary key, j varchar(8)) tags(t int)')
tdSql.execute(f'insert into t1 using st tags(1) values(1711098732000, 1) (1711098732000, 2)')
tdSql.execute(f'insert into t2 using st tags(2) values(1711098732001, -2) (1711098732001, -1)')
tdSql.execute(f'insert into t2 using st tags(2) values(1711098732001, 1) (1711098732001, 2)')
tdSql.execute(f'insert into t3 using st tags(3) values(1711098732002, 11) (1711098732002, 12)')
tdSql.query("select * from st")
tdSql.checkRows(8)
tdSql.execute(f'create topic topic_all with meta as database d1')
consumer_dict = { consumer_dict = {
"group.id": "g1", "group.id": "g1",
"td.connect.user": "root", "td.connect.user": "root",
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
"auto.offset.reset": "earliest", "auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true", "experimental.snapshot.enable": "true",
} }
consumer = Consumer(consumer_dict) consumer = Consumer(consumer_dict)
try: try:
consumer.subscribe(["topic_all"]) consumer.subscribe(["topic_in"])
except TmqError:
tdLog.exit(f"subscribe error")
index = 0
try:
while True:
res = consumer.poll(1)
if not res:
if index != 1:
tdLog.exit("consume error")
break
val = res.value()
if val is None:
continue
cnt = 0;
for block in val:
cnt += len(block.fetchall())
if cnt != 8:
tdLog.exit("consume error")
index += 1
finally:
consumer.close()
def consume_TS_4540_Test(self):
tdSql.execute(f'create database if not exists test')
tdSql.execute(f'use test')
tdSql.execute(f'CREATE STABLE `test`.`b` ( `time` TIMESTAMP , `task_id` NCHAR(1000) ) TAGS( `key` NCHAR(1000))')
tdSql.execute(f"insert into `test`.b1 using `test`.`b`(`key`) tags('1') (time, task_id) values ('2024-03-04 12:50:01.000', '32') `test`.b2 using `test`.`b`(`key`) tags('2') (time, task_id) values ('2024-03-04 12:50:01.000', '43') `test`.b3 using `test`.`b`(`key`) tags('3') (time, task_id) values ('2024-03-04 12:50:01.000', '123456')")
tdSql.execute(f'create topic tt as select tbname,task_id,`key` from b')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_dict)
try:
consumer.subscribe(["tt"])
except TmqError: except TmqError:
tdLog.exit(f"subscribe error") tdLog.exit(f"subscribe error")
@ -391,30 +68,33 @@ class TDTestCase:
continue continue
for block in val: for block in val:
data = block.fetchall() data = block.fetchall()
print(data) for element in data:
if data != [('b1', '32', '1')] and data != [('b2', '43', '2')] and data != [('b3', '123456', '3')]: print(element)
tdLog.exit(f"index = 0 table b1 error") if len(data) != 2:
tdLog.exit(f"fetchall len != 2")
if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 0, 1),
(datetime(2022, 11, 22, 12, 41, 9, 68000), 6, 1)]:
tdLog.exit(f"data error")
consumer.commit(res)
break
finally: finally:
consumer.close() consumer.close()
def consume_ts_4544(self): tdSql.query(f'show subscriptions;')
tdSql.execute(f'create database if not exists d1') sub = tdSql.getData(0, 4);
tdSql.execute(f'use d1') print(sub)
tdSql.execute(f'create table stt(ts timestamp, i int) tags(t int)') if not sub.startswith("tsdb"):
tdSql.execute(f'insert into tt1 using stt tags(1) values(now, 1) (now+1s, 2)') tdLog.exit(f"show subscriptions error")
tdSql.execute(f'insert into tt2 using stt tags(2) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into tt3 using stt tags(3) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into tt1 using stt tags(1) values(now+5s, 11) (now+10s, 12)')
tdSql.execute(f'create topic topic_in as select * from stt where tbname in ("tt2")') tdSql.execute(f'use abc1;')
tdSql.execute(f'insert into pk values(1669092069069, 10, 1);')
tdSql.execute(f'insert into pk values(1669092069069, 5, 1);')
tdSql.execute(f'insert into pk values(1669092069069, 12, 1);')
tdSql.execute(f'insert into pk values(1669092069069, 7, 1);')
tdSql.execute(f'flush database abc1')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_dict) consumer = Consumer(consumer_dict)
try: try:
@ -422,22 +102,66 @@ class TDTestCase:
except TmqError: except TmqError:
tdLog.exit(f"subscribe error") tdLog.exit(f"subscribe error")
index = 0
try:
while True:
res = consumer.poll(1)
if not res:
break
val = res.value()
if val is None:
continue
for block in val:
data = block.fetchall()
for element in data:
print(element)
if index == 0:
if len(data) != 6:
tdLog.exit(f"fetchall len != 6")
if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1),
(datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1),
(datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1),
(datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1),
(datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1),
(datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]:
tdLog.exit(f"data error")
if index == 1:
if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]:
tdLog.exit(f"data error")
index += 1
finally:
consumer.close() consumer.close()
def consume_ts_4551(self): def primaryKeyTestString(self):
tdSql.execute(f'use d1') print("==============Case 2: primary key test string")
tdSql.execute(f'create database if not exists db_pk_string vgroups 1 wal_retention_period 3600;')
tdSql.execute(f'use db_pk_string;')
tdSql.execute(f'create table if not exists pk (ts timestamp, c1 varchar(64) primary key, c2 int);')
tdSql.execute(f'insert into pk values(1669092069068, "hello", 1);')
tdSql.execute(f'insert into pk values(1669092069068, "word", 1);')
tdSql.execute(f'flush database db_pk_string')
tdSql.execute(f'insert into pk values(1669092069069, "him", 1) (1669092069069, "value", 1);')
tdSql.execute(f'insert into pk values(1669092069069, "she", 1) (1669092069069, "like", 1);')
tdSql.execute(f'insert into pk values(1669092069068, "from", 1) (1669092069068, "it", 1);')
tdSql.execute(f'create topic topic_pk_string with meta as database db_pk_string')
tdSql.execute(f'create topic topic_stable as stable stt where tbname like "t%"')
consumer_dict = { consumer_dict = {
"group.id": "g1", "group.id": "g1",
"td.connect.user": "root", "td.connect.user": "root",
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
"auto.offset.reset": "earliest", "auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
} }
consumer = Consumer(consumer_dict) consumer = Consumer(consumer_dict)
try: try:
consumer.subscribe(["topic_stable"]) consumer.subscribe(["topic_pk_string"])
except TmqError: except TmqError:
tdLog.exit(f"subscribe error") tdLog.exit(f"subscribe error")
@ -446,19 +170,93 @@ class TDTestCase:
res = consumer.poll(1) res = consumer.poll(1)
if not res: if not res:
break break
val = res.value()
if val is None:
continue
for block in val:
data = block.fetchall()
for element in data:
print(element)
if len(data) != 2:
tdLog.exit(f"fetchall len != 2")
# if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), "hello", 1),
# (datetime(2022, 11, 22, 12, 41, 9, 68000), "world", 1)]:
# tdLog.exit(f"data error")
consumer.commit(res)
break
finally:
consumer.close()
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")
tdDnodes.stop(1)
time.sleep(2)
tdDnodes.start(1)
tdSql.execute(f'use db_pk_string;')
tdSql.execute(f'insert into pk values(1669092069069, "10", 1);')
tdSql.execute(f'insert into pk values(1669092069069, "5", 1);')
tdSql.execute(f'insert into pk values(1669092069069, "12", 1);')
tdSql.execute(f'insert into pk values(1669092069069, "7", 1);')
tdSql.execute(f'flush database db_pk_string')
consumer = Consumer(consumer_dict)
try:
consumer.subscribe(["topic_pk_string"])
except TmqError:
tdLog.exit(f"subscribe error")
index = 0
try:
while True:
res = consumer.poll(1)
if not res:
break
val = res.value()
if val is None:
continue
for block in val:
data = block.fetchall()
for element in data:
print(element)
if index == 0:
if len(data) != 6:
tdLog.exit(f"fetchall len != 6")
# if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1),
# (datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1),
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1),
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1),
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1),
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]:
# tdLog.exit(f"data error")
if index == 1:
if len(data) != 4:
tdLog.exit(f"fetchall len != 4")
# if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1),
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1),
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1),
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]:
# tdLog.exit(f"data error")
index += 1
finally: finally:
consumer.close() consumer.close()
print("consume_ts_4551 ok")
def run(self): def run(self):
self.consumeTest() # self.primaryKeyTestInt()
self.consume_ts_4544() self.primaryKeyTestString()
self.consume_ts_4551()
self.consume_TS_4540_Test()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())