diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f7c51c43b5..6c512d4859 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2265,13 +2265,16 @@ static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffset doBlockDataPrimaryKeyFilter(pBlock, offset); SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1); + if (pBlock->info.rows < 1) { + return ; + } void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1); val.type = pColPk->info.type; - if(IS_VAR_DATA_TYPE(pColPk->info.type)) { + if (IS_VAR_DATA_TYPE(pColPk->info.type)) { val.pData = taosMemoryMalloc(varDataLen(tmp)); val.nData = varDataLen(tmp); memcpy(val.pData, varDataVal(tmp), varDataLen(tmp)); - }else{ + } else { memcpy(&val.val, tmp, pColPk->info.bytes); } } @@ -2292,13 +2295,19 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { - SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + while (1) { + SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); - if (pResult && pResult->info.rows > 0) { - bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader); - processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); - qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid); - return pResult; + if (pResult && pResult->info.rows > 0) { + bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader); + processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); + qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid); + if (pResult->info.rows > 0) { + return pResult; + } + } else { + break; + } } STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; diff --git a/tests/system-test/7-tmq/tmq_primary_key.py b/tests/system-test/7-tmq/tmq_primary_key.py index 7d0d3da4fc..80888ddbe6 100644 --- a/tests/system-test/7-tmq/tmq_primary_key.py +++ b/tests/system-test/7-tmq/tmq_primary_key.py @@ -12,6 +12,7 @@ from util.dnodes import * from util.common import * from taos.tmq import * from util.dnodes import * +from util.cluster import * import datetime sys.path.append("./7-tmq") @@ -137,6 +138,7 @@ class TDTestCase: print("index:" + str(index)) finally: consumer.close() + tdSql.execute(f'drop topic topic_pk_query;') def primaryKeyTestIntStable(self): print("==============Case 2: primary key test int for stable") @@ -247,6 +249,7 @@ class TDTestCase: print("index:" + str(index)) finally: consumer.close() + tdSql.execute(f'drop topic topic_pk_stable;') def primaryKeyTestInt(self): print("==============Case 3: primary key test int for db") @@ -356,6 +359,7 @@ class TDTestCase: print("index:" + str(index)) finally: consumer.close() + tdSql.execute(f'drop topic topic_in;') def primaryKeyTestString(self): print("==============Case 4: primary key test string for db") @@ -468,12 +472,90 @@ class TDTestCase: print("index:" + str(index)) finally: consumer.close() + tdSql.execute(f'drop topic topic_pk_string;') + def primaryKeyTestTD_30755(self): + print("==============Case 5: primary key test td-30755 for query") + tdSql.execute(f'create database if not exists db_pk_query_30755 vgroups 1 wal_retention_period 3600;') + tdSql.execute(f'use db_pk_query_30755;') + tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);') + for i in range(0, 100000): + tdSql.execute(f'insert into pk values(1669092069068, {i}, 1);') + tdSql.execute(f'flush database db_pk_query_30755') + + tdSql.execute(f'create topic topic_pk_query_30755 as select * from pk') + + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + "experimental.snapshot.enable": "true", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_query_30755"]) + except TmqError: + tdLog.exit(f"subscribe error") + + firstConsume = 0 + try: + while firstConsume < 50000: + res = consumer.poll(1) + if not res: + continue + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + firstConsume += len(data) + consumer.commit(res) + finally: + consumer.close() + + tdSql.query(f'show subscriptions;') + sub = tdSql.getData(0, 4); + print(sub) + if not sub.startswith("tsdb"): + tdLog.exit(f"show subscriptions error") + + tdDnodes.stop(1) + time.sleep(2) + tdDnodes.start(1) + + consumer = Consumer(consumer_dict) + + tdSql.execute(f'use db_pk_query_30755;') + try: + consumer.subscribe(["topic_pk_query_30755"]) + except TmqError: + tdLog.exit(f"subscribe error") + + secondConsume = 0 + try: + while firstConsume + secondConsume < 100000: + res = consumer.poll(1) + if not res: + continue + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + secondConsume += len(data) + consumer.commit(res) + finally: + consumer.close() + tdSql.execute(f'drop topic topic_pk_query_30755;') def run(self): self.primaryKeyTestIntQuery() self.primaryKeyTestIntStable() self.primaryKeyTestInt() self.primaryKeyTestString() + self.primaryKeyTestTD_30755() def stop(self): tdSql.close()