fix:filter data error & add test case
This commit is contained in:
parent
2478519c5d
commit
035b61218c
|
@ -80,7 +80,7 @@ int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, S
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
|
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
|
||||||
const int32_t MAX_ROWS_TO_RETURN = 4096;
|
const int32_t MAX_ROWS_TO_RETURN = 1;
|
||||||
|
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
|
@ -28,8 +28,225 @@ class TDTestCase:
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor())
|
||||||
|
|
||||||
|
def primaryKeyTestIntQuery(self):
|
||||||
|
print("==============Case 1: primary key test int for query")
|
||||||
|
tdSql.execute(f'create database if not exists db_pk_query vgroups 1 wal_retention_period 3600;')
|
||||||
|
tdSql.execute(f'use db_pk_query;')
|
||||||
|
tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069068, 0, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069068, 6, 1);')
|
||||||
|
tdSql.execute(f'flush database db_pk_query')
|
||||||
|
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069068, 10, 1) (1669092069068, 16, 1);')
|
||||||
|
|
||||||
|
tdSql.execute(f'create topic topic_pk_query as select * from pk')
|
||||||
|
|
||||||
|
consumer_dict = {
|
||||||
|
"group.id": "g1",
|
||||||
|
"td.connect.user": "root",
|
||||||
|
"td.connect.pass": "taosdata",
|
||||||
|
"auto.offset.reset": "earliest",
|
||||||
|
"enable.auto.commit": "false",
|
||||||
|
"experimental.snapshot.enable": "true",
|
||||||
|
}
|
||||||
|
consumer = Consumer(consumer_dict)
|
||||||
|
|
||||||
|
try:
|
||||||
|
consumer.subscribe(["topic_pk_query"])
|
||||||
|
except TmqError:
|
||||||
|
tdLog.exit(f"subscribe error")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
res = consumer.poll(1)
|
||||||
|
if not res:
|
||||||
|
break
|
||||||
|
val = res.value()
|
||||||
|
if val is None:
|
||||||
|
continue
|
||||||
|
for block in val:
|
||||||
|
data = block.fetchall()
|
||||||
|
for element in data:
|
||||||
|
print(element)
|
||||||
|
if len(data) != 2:
|
||||||
|
tdLog.exit(f"fetchall len != 2")
|
||||||
|
if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 0, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 68000), 6, 1)]:
|
||||||
|
tdLog.exit(f"data error")
|
||||||
|
|
||||||
|
consumer.commit(res)
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
consumer.close()
|
||||||
|
|
||||||
|
tdSql.query(f'show subscriptions;')
|
||||||
|
sub = tdSql.getData(0, 4);
|
||||||
|
print(sub)
|
||||||
|
if not sub.startswith("tsdb"):
|
||||||
|
tdLog.exit(f"show subscriptions error")
|
||||||
|
|
||||||
|
tdSql.execute(f'use db_pk_query;')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 10, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 5, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 12, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 7, 1);')
|
||||||
|
|
||||||
|
tdSql.execute(f'flush database db_pk_query')
|
||||||
|
|
||||||
|
consumer = Consumer(consumer_dict)
|
||||||
|
|
||||||
|
try:
|
||||||
|
consumer.subscribe(["topic_pk_query"])
|
||||||
|
except TmqError:
|
||||||
|
tdLog.exit(f"subscribe error")
|
||||||
|
|
||||||
|
index = 0
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
res = consumer.poll(1)
|
||||||
|
if not res:
|
||||||
|
break
|
||||||
|
val = res.value()
|
||||||
|
if val is None:
|
||||||
|
continue
|
||||||
|
for block in val:
|
||||||
|
data = block.fetchall()
|
||||||
|
for element in data:
|
||||||
|
print(element)
|
||||||
|
if index == 0:
|
||||||
|
if len(data) != 6:
|
||||||
|
tdLog.exit(f"fetchall len != 6")
|
||||||
|
if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]:
|
||||||
|
tdLog.exit(f"data error")
|
||||||
|
if index >= 1:
|
||||||
|
if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \
|
||||||
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \
|
||||||
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \
|
||||||
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]:
|
||||||
|
tdLog.exit(f"data error")
|
||||||
|
index += 1
|
||||||
|
print("index:" + str(index))
|
||||||
|
finally:
|
||||||
|
consumer.close()
|
||||||
|
|
||||||
|
def primaryKeyTestIntStable(self):
|
||||||
|
print("==============Case 1: primary key test int for stable")
|
||||||
|
tdSql.execute(f'create database if not exists db_pk_stable vgroups 1 wal_retention_period 3600;')
|
||||||
|
tdSql.execute(f'use db_pk_stable;')
|
||||||
|
tdSql.execute(f'create table if not exists pks (ts timestamp, c1 int primary key, c2 int) tags (t int);')
|
||||||
|
tdSql.execute(f'create table if not exists pk using pks tags(1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069068, 0, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069068, 6, 1);')
|
||||||
|
tdSql.execute(f'flush database db_pk_stable')
|
||||||
|
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069068, 10, 1) (1669092069068, 16, 1);')
|
||||||
|
|
||||||
|
tdSql.execute(f'create topic topic_pk_stable as stable pks')
|
||||||
|
|
||||||
|
consumer_dict = {
|
||||||
|
"group.id": "g1",
|
||||||
|
"td.connect.user": "root",
|
||||||
|
"td.connect.pass": "taosdata",
|
||||||
|
"auto.offset.reset": "earliest",
|
||||||
|
"enable.auto.commit": "false",
|
||||||
|
"experimental.snapshot.enable": "true",
|
||||||
|
}
|
||||||
|
consumer = Consumer(consumer_dict)
|
||||||
|
|
||||||
|
try:
|
||||||
|
consumer.subscribe(["topic_pk_stable"])
|
||||||
|
except TmqError:
|
||||||
|
tdLog.exit(f"subscribe error")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
res = consumer.poll(1)
|
||||||
|
if not res:
|
||||||
|
break
|
||||||
|
val = res.value()
|
||||||
|
if val is None:
|
||||||
|
continue
|
||||||
|
for block in val:
|
||||||
|
data = block.fetchall()
|
||||||
|
for element in data:
|
||||||
|
print(element)
|
||||||
|
if len(data) != 2:
|
||||||
|
tdLog.exit(f"fetchall len != 2")
|
||||||
|
if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 0, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 68000), 6, 1)]:
|
||||||
|
tdLog.exit(f"data error")
|
||||||
|
|
||||||
|
consumer.commit(res)
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
consumer.close()
|
||||||
|
|
||||||
|
tdSql.query(f'show subscriptions;')
|
||||||
|
sub = tdSql.getData(0, 4);
|
||||||
|
print(sub)
|
||||||
|
if not sub.startswith("tsdb"):
|
||||||
|
tdLog.exit(f"show subscriptions error")
|
||||||
|
|
||||||
|
tdSql.execute(f'use db_pk_stable;')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 10, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 5, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 12, 1);')
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069069, 7, 1);')
|
||||||
|
|
||||||
|
tdSql.execute(f'flush database db_pk_stable')
|
||||||
|
|
||||||
|
consumer = Consumer(consumer_dict)
|
||||||
|
|
||||||
|
try:
|
||||||
|
consumer.subscribe(["topic_pk_stable"])
|
||||||
|
except TmqError:
|
||||||
|
tdLog.exit(f"subscribe error")
|
||||||
|
|
||||||
|
index = 0
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
res = consumer.poll(1)
|
||||||
|
if not res:
|
||||||
|
break
|
||||||
|
val = res.value()
|
||||||
|
if val is None:
|
||||||
|
continue
|
||||||
|
for block in val:
|
||||||
|
data = block.fetchall()
|
||||||
|
for element in data:
|
||||||
|
print(element)
|
||||||
|
if index == 0:
|
||||||
|
if len(data) != 6:
|
||||||
|
tdLog.exit(f"fetchall len != 6")
|
||||||
|
if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1),
|
||||||
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]:
|
||||||
|
tdLog.exit(f"data error")
|
||||||
|
if index >= 1:
|
||||||
|
if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \
|
||||||
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \
|
||||||
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \
|
||||||
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]:
|
||||||
|
tdLog.exit(f"data error")
|
||||||
|
index += 1
|
||||||
|
print("index:" + str(index))
|
||||||
|
finally:
|
||||||
|
consumer.close()
|
||||||
|
|
||||||
def primaryKeyTestInt(self):
|
def primaryKeyTestInt(self):
|
||||||
print("==============Case 1: primary key test int")
|
print("==============Case 1: primary key test int for db")
|
||||||
tdSql.execute(f'create database if not exists abc1 vgroups 1 wal_retention_period 3600;')
|
tdSql.execute(f'create database if not exists abc1 vgroups 1 wal_retention_period 3600;')
|
||||||
tdSql.execute(f'use abc1;')
|
tdSql.execute(f'use abc1;')
|
||||||
tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);')
|
tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);')
|
||||||
|
@ -125,23 +342,24 @@ class TDTestCase:
|
||||||
(datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1),
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1),
|
||||||
(datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]:
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]:
|
||||||
tdLog.exit(f"data error")
|
tdLog.exit(f"data error")
|
||||||
if index == 1:
|
if index >= 1:
|
||||||
if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \
|
if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1)] \
|
||||||
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1)] \
|
||||||
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1)] \
|
||||||
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]:
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]:
|
||||||
tdLog.exit(f"data error")
|
tdLog.exit(f"data error")
|
||||||
index += 1
|
index += 1
|
||||||
|
print("index:" + str(index))
|
||||||
finally:
|
finally:
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
|
||||||
def primaryKeyTestString(self):
|
def primaryKeyTestString(self):
|
||||||
print("==============Case 2: primary key test string")
|
print("==============Case 2: primary key test string for db")
|
||||||
tdSql.execute(f'create database if not exists db_pk_string vgroups 1 wal_retention_period 3600;')
|
tdSql.execute(f'create database if not exists db_pk_string vgroups 1 wal_retention_period 3600;')
|
||||||
tdSql.execute(f'use db_pk_string;')
|
tdSql.execute(f'use db_pk_string;')
|
||||||
tdSql.execute(f'create table if not exists pk (ts timestamp, c1 varchar(64) primary key, c2 int);')
|
tdSql.execute(f'create table if not exists pk (ts timestamp, c1 varchar(64) primary key, c2 int);')
|
||||||
tdSql.execute(f'insert into pk values(1669092069068, "hello", 1);')
|
tdSql.execute(f'insert into pk values(1669092069068, "ahello", 1);')
|
||||||
tdSql.execute(f'insert into pk values(1669092069068, "word", 1);')
|
tdSql.execute(f'insert into pk values(1669092069068, "aworld", 1);')
|
||||||
tdSql.execute(f'flush database db_pk_string')
|
tdSql.execute(f'flush database db_pk_string')
|
||||||
|
|
||||||
tdSql.execute(f'insert into pk values(1669092069069, "him", 1) (1669092069069, "value", 1);')
|
tdSql.execute(f'insert into pk values(1669092069069, "him", 1) (1669092069069, "value", 1);')
|
||||||
|
@ -179,9 +397,9 @@ class TDTestCase:
|
||||||
print(element)
|
print(element)
|
||||||
if len(data) != 2:
|
if len(data) != 2:
|
||||||
tdLog.exit(f"fetchall len != 2")
|
tdLog.exit(f"fetchall len != 2")
|
||||||
# if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), "hello", 1),
|
if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 'ahello', 1),
|
||||||
# (datetime(2022, 11, 22, 12, 41, 9, 68000), "world", 1)]:
|
(datetime(2022, 11, 22, 12, 41, 9, 68000), 'aworld', 1)]:
|
||||||
# tdLog.exit(f"data error")
|
tdLog.exit(f"data error")
|
||||||
|
|
||||||
consumer.commit(res)
|
consumer.commit(res)
|
||||||
break
|
break
|
||||||
|
@ -207,19 +425,23 @@ class TDTestCase:
|
||||||
tdSql.execute(f'flush database db_pk_string')
|
tdSql.execute(f'flush database db_pk_string')
|
||||||
|
|
||||||
consumer = Consumer(consumer_dict)
|
consumer = Consumer(consumer_dict)
|
||||||
|
print(1)
|
||||||
try:
|
try:
|
||||||
consumer.subscribe(["topic_pk_string"])
|
consumer.subscribe(["topic_pk_string"])
|
||||||
except TmqError:
|
except TmqError:
|
||||||
tdLog.exit(f"subscribe error")
|
tdLog.exit(f"subscribe error")
|
||||||
|
time.sleep(5)
|
||||||
index = 0
|
index = 0
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
|
print(consumer)
|
||||||
res = consumer.poll(1)
|
res = consumer.poll(1)
|
||||||
|
print(res)
|
||||||
if not res:
|
if not res:
|
||||||
|
print(11323)
|
||||||
break
|
break
|
||||||
val = res.value()
|
val = res.value()
|
||||||
|
print(12)
|
||||||
if val is None:
|
if val is None:
|
||||||
continue
|
continue
|
||||||
for block in val:
|
for block in val:
|
||||||
|
@ -229,27 +451,28 @@ class TDTestCase:
|
||||||
if index == 0:
|
if index == 0:
|
||||||
if len(data) != 6:
|
if len(data) != 6:
|
||||||
tdLog.exit(f"fetchall len != 6")
|
tdLog.exit(f"fetchall len != 6")
|
||||||
# if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 10, 1),
|
if data != [(datetime(2022, 11, 22, 12, 41, 9, 68000), 'from', 1),
|
||||||
# (datetime(2022, 11, 22, 12, 41, 9, 68000), 16, 1),
|
(datetime(2022, 11, 22, 12, 41, 9, 68000), 'it', 1),
|
||||||
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 0, 1),
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 'him', 1),
|
||||||
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 1, 1),
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 'like', 1),
|
||||||
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 2, 1),
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 'she', 1),
|
||||||
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 3, 1)]:
|
(datetime(2022, 11, 22, 12, 41, 9, 69000), 'value', 1)]:
|
||||||
# tdLog.exit(f"data error")
|
tdLog.exit(f"data error")
|
||||||
if index == 1:
|
if index >= 1:
|
||||||
if len(data) != 4:
|
if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "10", 1)] \
|
||||||
tdLog.exit(f"fetchall len != 4")
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "5", 1)] \
|
||||||
# if data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), 10, 1),
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "12", 1)] \
|
||||||
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 5, 1),
|
and data != [(datetime(2022, 11, 22, 12, 41, 9, 69000), "7", 1)]:
|
||||||
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 12, 1),
|
tdLog.exit(f"data error")
|
||||||
# (datetime(2022, 11, 22, 12, 41, 9, 69000), 7, 1)]:
|
|
||||||
# tdLog.exit(f"data error")
|
|
||||||
|
|
||||||
index += 1
|
index += 1
|
||||||
|
print("index:" + str(index))
|
||||||
finally:
|
finally:
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
# self.primaryKeyTestIntQuery()
|
||||||
|
# self.primaryKeyTestIntStable()
|
||||||
# self.primaryKeyTestInt()
|
# self.primaryKeyTestInt()
|
||||||
self.primaryKeyTestString()
|
self.primaryKeyTestString()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue