fix: tag scan support limit and remove slimit optimization
This commit is contained in:
parent
5204c99cc7
commit
24436c8502
|
@ -307,7 +307,6 @@ typedef struct STagScanInfo {
|
|||
SSDataBlock* pRes;
|
||||
SColMatchInfo matchInfo;
|
||||
int32_t curPos;
|
||||
SLimitNode* pSlimit;
|
||||
SReadHandle readHandle;
|
||||
STableListInfo* pTableListInfo;
|
||||
uint64_t suid;
|
||||
|
@ -318,6 +317,7 @@ typedef struct STagScanInfo {
|
|||
SArray* aUidTags; // SArray<STUidTagInfo>
|
||||
SArray* aFilterIdxs; // SArray<int32_t>
|
||||
SStorageAPI* pStorageAPI;
|
||||
SLimitInfo limitInfo;
|
||||
} STagScanInfo;
|
||||
|
||||
typedef enum EStreamScanMode {
|
||||
|
|
|
@ -3060,7 +3060,12 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
|||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
pRes->info.rows = count;
|
||||
pOperator->resultInfo.totalRows += count;
|
||||
|
||||
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
|
||||
if (bLimitReached) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
|
@ -3094,28 +3099,20 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
|
|||
if (++pInfo->curPos >= size) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
// each table with tbname is a group, hence its own block, but only group when slimit exists for performance reason.
|
||||
if (pInfo->pSlimit != NULL) {
|
||||
if (pInfo->curPos < pInfo->pSlimit->offset) {
|
||||
continue;
|
||||
}
|
||||
pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name));
|
||||
if (pInfo->curPos >= (pInfo->pSlimit->offset + pInfo->pSlimit->limit) - 1) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
pRes->info.rows = count;
|
||||
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
|
||||
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
|
||||
if (bLimitReached) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||
}
|
||||
|
||||
pRes->info.rows = count;
|
||||
pOperator->resultInfo.totalRows += count;
|
||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||
|
||||
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
@ -3169,8 +3166,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||
pInfo->readHandle = *pReadHandle;
|
||||
pInfo->curPos = 0;
|
||||
pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group
|
||||
|
||||
initLimitInfo(pPhyNode->node.pLimit, pPhyNode->node.pSlimit, &pInfo->limitInfo);
|
||||
setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||
pTaskInfo);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
|
|
@ -2730,36 +2730,6 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static SLogicNode* tagScanOptFindAncestorWithSlimit(SLogicNode* pTableScanNode) {
|
||||
SLogicNode* pNode = pTableScanNode->pParent;
|
||||
while (NULL != pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) ||
|
||||
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) {
|
||||
return NULL;
|
||||
}
|
||||
if (NULL != pNode->pSlimit) {
|
||||
return pNode;
|
||||
}
|
||||
pNode = pNode->pParent;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) {
|
||||
if (NULL != pTableScanNode->pSlimit) {
|
||||
return;
|
||||
}
|
||||
|
||||
SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode);
|
||||
if (NULL != pNode) {
|
||||
// TODO: only set the slimit now. push down slimit later
|
||||
pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit);
|
||||
((SLimitNode*)pTableScanNode->pSlimit)->limit += ((SLimitNode*)pTableScanNode->pSlimit)->offset;
|
||||
((SLimitNode*)pTableScanNode->pSlimit)->offset = 0;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanOptShouldBeOptimized);
|
||||
if (NULL == pScanNode) {
|
||||
|
@ -2795,13 +2765,6 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
|
|||
pScanNode->node.pTargets = pScanTargets;
|
||||
}
|
||||
|
||||
int32_t code = replaceLogicNode(pLogicSubplan, pAgg, (SLogicNode*)pScanNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
NODES_CLEAR_LIST(pAgg->pChildren);
|
||||
}
|
||||
nodesDestroyNode((SNode*)pAgg);
|
||||
tagScanOptCloneAncestorSlimit((SLogicNode*)pScanNode);
|
||||
|
||||
pScanNode->onlyMetaCtbIdx = false;
|
||||
|
||||
pCxt->optimized = true;
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
import sys
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.dnodes import tdDnodes
|
||||
from math import inf
|
||||
|
||||
class TDTestCase:
|
||||
def caseDescription(self):
|
||||
'''
|
||||
case1<shenglian zhou>: [TD-11204]Difference improvement that can ignore negative
|
||||
'''
|
||||
return
|
||||
|
||||
def init(self, conn, logSql, replicaVer=1):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), True)
|
||||
self._conn = conn
|
||||
|
||||
def restartTaosd(self, index=1, dbname="db"):
|
||||
tdDnodes.stop(index)
|
||||
tdDnodes.startWithoutSleep(index)
|
||||
tdSql.execute(f"use tagscan")
|
||||
|
||||
|
||||
def runSingleVgroup(self):
|
||||
print("running {}".format(__file__))
|
||||
tdSql.execute("drop database if exists tagscan2")
|
||||
tdSql.execute("create database if not exists tagscan2 vgroups 1")
|
||||
tdSql.execute('use tagscan2')
|
||||
tdSql.execute('create table stb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);')
|
||||
|
||||
tdSql.execute("create table tb1 using stb1 tags(1,'1',1.0);")
|
||||
|
||||
tdSql.execute("create table tb2 using stb1 tags(2,'2',2.0);")
|
||||
|
||||
tdSql.execute("create table tb3 using stb1 tags(3,'3',3.0);")
|
||||
|
||||
tdSql.execute("create table tb4 using stb1 tags(4,'4',4.0);")
|
||||
|
||||
tdSql.execute("create table tb5 using stb1 tags(5,'5',5.0);")
|
||||
|
||||
tdSql.execute("create table tb6 using stb1 tags(5,'5',5.0);")
|
||||
|
||||
tdSql.execute('insert into tb1 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb2 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb3 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb4 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb5 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb6 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.query('select tags t1,t2 from stb1 order by t1,t2;')
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 0, 1)
|
||||
tdSql.checkData(0, 1, '1')
|
||||
tdSql.checkData(1, 0, 2)
|
||||
tdSql.checkData(1, 1, '2')
|
||||
tdSql.checkData(2, 0, 3)
|
||||
tdSql.checkData(2, 1, '3')
|
||||
tdSql.checkData(3, 0, 4)
|
||||
tdSql.checkData(3, 1, '4')
|
||||
tdSql.checkData(4, 0, 5)
|
||||
tdSql.checkData(4, 1, '5')
|
||||
tdSql.checkData(5, 0, 5)
|
||||
tdSql.checkData(5, 1, '5')
|
||||
|
||||
tdSql.query('select * from (select tags t1,t2 from stb1 group by t1,t2 slimit 2,3) order by t1,t2;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn from stb1 group by tbname order by tbname limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0, 0, 'tb3')
|
||||
|
||||
tdSql.query('select * from (select distinct tbname tn from stb1 limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select distinct tbname tn, t1,t2 from stb1 limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags t1,t2 from stb1 order by t1, t2 limit 2,3) order by t1, t2;')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0, 0, 3)
|
||||
tdSql.checkData(0, 1, '3')
|
||||
tdSql.checkData(1, 0, 4)
|
||||
tdSql.checkData(1, 1, '4')
|
||||
tdSql.checkData(2, 0, 5)
|
||||
tdSql.checkData(2, 1, '5')
|
||||
|
||||
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 partition by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
|
||||
tdSql.execute('drop database tagscan2')
|
||||
def runMultiVgroups(self):
|
||||
print("running {}".format(__file__))
|
||||
tdSql.execute("drop database if exists tagscan")
|
||||
tdSql.execute("create database if not exists tagscan")
|
||||
tdSql.execute('use tagscan')
|
||||
tdSql.execute('create table stb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);')
|
||||
|
||||
tdSql.execute("create table tb1 using stb1 tags(1,'1',1.0);")
|
||||
|
||||
tdSql.execute("create table tb2 using stb1 tags(2,'2',2.0);")
|
||||
|
||||
tdSql.execute("create table tb3 using stb1 tags(3,'3',3.0);")
|
||||
|
||||
tdSql.execute("create table tb4 using stb1 tags(4,'4',4.0);")
|
||||
|
||||
tdSql.execute("create table tb5 using stb1 tags(5,'5',5.0);")
|
||||
|
||||
tdSql.execute("create table tb6 using stb1 tags(5,'5',5.0);")
|
||||
|
||||
tdSql.execute('insert into tb1 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb2 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb3 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb4 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb5 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb6 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.query('select tags t1,t2 from stb1 order by t1,t2;')
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 0, 1)
|
||||
tdSql.checkData(0, 1, '1')
|
||||
tdSql.checkData(1, 0, 2)
|
||||
tdSql.checkData(1, 1, '2')
|
||||
tdSql.checkData(2, 0, 3)
|
||||
tdSql.checkData(2, 1, '3')
|
||||
tdSql.checkData(3, 0, 4)
|
||||
tdSql.checkData(3, 1, '4')
|
||||
tdSql.checkData(4, 0, 5)
|
||||
tdSql.checkData(4, 1, '5')
|
||||
tdSql.checkData(5, 0, 5)
|
||||
tdSql.checkData(5, 1, '5')
|
||||
|
||||
tdSql.query('select * from (select tags t1,t2 from stb1 group by t1,t2 slimit 2,3) order by t1,t2;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn from stb1 group by tbname order by tbname limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0, 0, 'tb3')
|
||||
|
||||
tdSql.query('select * from (select distinct tbname tn from stb1 limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select distinct tbname tn, t1,t2 from stb1 limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags t1,t2 from stb1 order by t1, t2 limit 2,3) order by t1, t2;')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0, 0, 3)
|
||||
tdSql.checkData(0, 1, '3')
|
||||
tdSql.checkData(1, 0, 4)
|
||||
tdSql.checkData(1, 1, '4')
|
||||
tdSql.checkData(2, 0, 5)
|
||||
tdSql.checkData(2, 1, '5')
|
||||
|
||||
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 partition by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
|
||||
tdSql.execute('drop database tagscan')
|
||||
|
||||
def run(self):
|
||||
self.runMultiVgroups()
|
||||
self.runSingleVgroup()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -1271,6 +1271,7 @@
|
|||
#develop test
|
||||
,,n,develop-test,python3 ./test.py -f 2-query/table_count_scan.py
|
||||
,,n,develop-test,python3 ./test.py -f 2-query/ts-range.py
|
||||
,,n,develop-test,python3 ./test.py -f 2-query/tag_scan.py
|
||||
,,n,develop-test,python3 ./test.py -f 2-query/show_create_db.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py
|
||||
|
|
Loading…
Reference in New Issue