fix:[TD-30755] fix heap-buffer-overflow when no qualified data in pk's data block.
This commit is contained in:
parent
dfe260fbde
commit
6282ebef22
|
@ -2265,13 +2265,16 @@ static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffset
|
||||||
doBlockDataPrimaryKeyFilter(pBlock, offset);
|
doBlockDataPrimaryKeyFilter(pBlock, offset);
|
||||||
SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1);
|
SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1);
|
||||||
|
|
||||||
|
if (pBlock->info.rows < 1) {
|
||||||
|
return ;
|
||||||
|
}
|
||||||
void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1);
|
void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1);
|
||||||
val.type = pColPk->info.type;
|
val.type = pColPk->info.type;
|
||||||
if(IS_VAR_DATA_TYPE(pColPk->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColPk->info.type)) {
|
||||||
val.pData = taosMemoryMalloc(varDataLen(tmp));
|
val.pData = taosMemoryMalloc(varDataLen(tmp));
|
||||||
val.nData = varDataLen(tmp);
|
val.nData = varDataLen(tmp);
|
||||||
memcpy(val.pData, varDataVal(tmp), varDataLen(tmp));
|
memcpy(val.pData, varDataVal(tmp), varDataLen(tmp));
|
||||||
}else{
|
} else {
|
||||||
memcpy(&val.val, tmp, pColPk->info.bytes);
|
memcpy(&val.val, tmp, pColPk->info.bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2292,13 +2295,19 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
while (1) {
|
||||||
|
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||||
|
|
||||||
if (pResult && pResult->info.rows > 0) {
|
if (pResult && pResult->info.rows > 0) {
|
||||||
bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader);
|
bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader);
|
||||||
processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
|
processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
|
||||||
qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid);
|
qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid);
|
||||||
return pResult;
|
if (pResult->info.rows > 0) {
|
||||||
|
return pResult;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
|
|
|
@ -12,6 +12,7 @@ from util.dnodes import *
|
||||||
from util.common import *
|
from util.common import *
|
||||||
from taos.tmq import *
|
from taos.tmq import *
|
||||||
from util.dnodes import *
|
from util.dnodes import *
|
||||||
|
from util.cluster import *
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
sys.path.append("./7-tmq")
|
sys.path.append("./7-tmq")
|
||||||
|
@ -137,6 +138,7 @@ class TDTestCase:
|
||||||
print("index:" + str(index))
|
print("index:" + str(index))
|
||||||
finally:
|
finally:
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
tdSql.execute(f'drop topic topic_pk_query;')
|
||||||
|
|
||||||
def primaryKeyTestIntStable(self):
|
def primaryKeyTestIntStable(self):
|
||||||
print("==============Case 2: primary key test int for stable")
|
print("==============Case 2: primary key test int for stable")
|
||||||
|
@ -247,6 +249,7 @@ class TDTestCase:
|
||||||
print("index:" + str(index))
|
print("index:" + str(index))
|
||||||
finally:
|
finally:
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
tdSql.execute(f'drop topic topic_pk_stable;')
|
||||||
|
|
||||||
def primaryKeyTestInt(self):
|
def primaryKeyTestInt(self):
|
||||||
print("==============Case 3: primary key test int for db")
|
print("==============Case 3: primary key test int for db")
|
||||||
|
@ -356,6 +359,7 @@ class TDTestCase:
|
||||||
print("index:" + str(index))
|
print("index:" + str(index))
|
||||||
finally:
|
finally:
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
tdSql.execute(f'drop topic topic_in;')
|
||||||
|
|
||||||
def primaryKeyTestString(self):
|
def primaryKeyTestString(self):
|
||||||
print("==============Case 4: primary key test string for db")
|
print("==============Case 4: primary key test string for db")
|
||||||
|
@ -468,12 +472,90 @@ class TDTestCase:
|
||||||
print("index:" + str(index))
|
print("index:" + str(index))
|
||||||
finally:
|
finally:
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
tdSql.execute(f'drop topic topic_pk_string;')
|
||||||
|
|
||||||
|
def primaryKeyTestTD_30755(self):
|
||||||
|
print("==============Case 5: primary key test td-30755 for query")
|
||||||
|
tdSql.execute(f'create database if not exists db_pk_query_30755 vgroups 1 wal_retention_period 3600;')
|
||||||
|
tdSql.execute(f'use db_pk_query_30755;')
|
||||||
|
tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);')
|
||||||
|
for i in range(0, 100000):
|
||||||
|
tdSql.execute(f'insert into pk values(1669092069068, {i}, 1);')
|
||||||
|
tdSql.execute(f'flush database db_pk_query_30755')
|
||||||
|
|
||||||
|
tdSql.execute(f'create topic topic_pk_query_30755 as select * from pk')
|
||||||
|
|
||||||
|
consumer_dict = {
|
||||||
|
"group.id": "g1",
|
||||||
|
"td.connect.user": "root",
|
||||||
|
"td.connect.pass": "taosdata",
|
||||||
|
"auto.offset.reset": "earliest",
|
||||||
|
"enable.auto.commit": "false",
|
||||||
|
"experimental.snapshot.enable": "true",
|
||||||
|
}
|
||||||
|
consumer = Consumer(consumer_dict)
|
||||||
|
|
||||||
|
try:
|
||||||
|
consumer.subscribe(["topic_pk_query_30755"])
|
||||||
|
except TmqError:
|
||||||
|
tdLog.exit(f"subscribe error")
|
||||||
|
|
||||||
|
firstConsume = 0
|
||||||
|
try:
|
||||||
|
while firstConsume < 50000:
|
||||||
|
res = consumer.poll(1)
|
||||||
|
if not res:
|
||||||
|
continue
|
||||||
|
val = res.value()
|
||||||
|
if val is None:
|
||||||
|
continue
|
||||||
|
for block in val:
|
||||||
|
data = block.fetchall()
|
||||||
|
firstConsume += len(data)
|
||||||
|
consumer.commit(res)
|
||||||
|
finally:
|
||||||
|
consumer.close()
|
||||||
|
|
||||||
|
tdSql.query(f'show subscriptions;')
|
||||||
|
sub = tdSql.getData(0, 4);
|
||||||
|
print(sub)
|
||||||
|
if not sub.startswith("tsdb"):
|
||||||
|
tdLog.exit(f"show subscriptions error")
|
||||||
|
|
||||||
|
tdDnodes.stop(1)
|
||||||
|
time.sleep(2)
|
||||||
|
tdDnodes.start(1)
|
||||||
|
|
||||||
|
consumer = Consumer(consumer_dict)
|
||||||
|
|
||||||
|
tdSql.execute(f'use db_pk_query_30755;')
|
||||||
|
try:
|
||||||
|
consumer.subscribe(["topic_pk_query_30755"])
|
||||||
|
except TmqError:
|
||||||
|
tdLog.exit(f"subscribe error")
|
||||||
|
|
||||||
|
secondConsume = 0
|
||||||
|
try:
|
||||||
|
while firstConsume + secondConsume < 100000:
|
||||||
|
res = consumer.poll(1)
|
||||||
|
if not res:
|
||||||
|
continue
|
||||||
|
val = res.value()
|
||||||
|
if val is None:
|
||||||
|
continue
|
||||||
|
for block in val:
|
||||||
|
data = block.fetchall()
|
||||||
|
secondConsume += len(data)
|
||||||
|
consumer.commit(res)
|
||||||
|
finally:
|
||||||
|
consumer.close()
|
||||||
|
tdSql.execute(f'drop topic topic_pk_query_30755;')
|
||||||
def run(self):
|
def run(self):
|
||||||
self.primaryKeyTestIntQuery()
|
self.primaryKeyTestIntQuery()
|
||||||
self.primaryKeyTestIntStable()
|
self.primaryKeyTestIntStable()
|
||||||
self.primaryKeyTestInt()
|
self.primaryKeyTestInt()
|
||||||
self.primaryKeyTestString()
|
self.primaryKeyTestString()
|
||||||
|
self.primaryKeyTestTD_30755()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
Loading…
Reference in New Issue