Merge pull request #25232 from taosdata/feat/TD-29093/2
Feat/td 29093/2
This commit is contained in:
commit
5091247957
|
@ -2136,6 +2136,9 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
|||
}
|
||||
|
||||
pTableListInfo->oneTableForEachGroup = groupByTbname;
|
||||
if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
|
||||
pTableListInfo->oneTableForEachGroup = true;
|
||||
}
|
||||
|
||||
if (groupSort && groupByTbname) {
|
||||
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
||||
|
|
|
@ -889,14 +889,15 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) {
|
||||
STableListInfo* pTableListInfo = pTableScanInfo->base.pTableListInfo;
|
||||
if (pTableListInfo->oneTableForEachGroup || pTableListInfo->groupOffset) { // group by tbname, group by tag + sort
|
||||
if (pTableListInfo->oneTableForEachGroup || pTableListInfo->groupOffset) { // group by tbname, group by tag + sort
|
||||
if (pTableScanInfo->countState < TABLE_COUNT_STATE_PROCESSED) {
|
||||
pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED;
|
||||
STableKeyInfo* pStart =
|
||||
(STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex);
|
||||
if (NULL == pStart) return NULL;
|
||||
return getBlockForEmptyTable(pOperator, pStart);
|
||||
}
|
||||
} else { // group by tag + no sort
|
||||
} else { // group by tag + no sort
|
||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
||||
if (pTableScanInfo->tableEndIndex + 1 >= numOfTables) {
|
||||
// get empty group, mark processed & rm from hash
|
||||
|
|
|
@ -2696,6 +2696,29 @@ static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode
|
|||
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
|
||||
}
|
||||
|
||||
static bool isTbnameFuction(SNode* pNode) {
|
||||
return QUERY_NODE_FUNCTION == nodeType(pNode) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pNode)->funcType;
|
||||
}
|
||||
|
||||
static bool hasTbnameFunction(SNodeList* pPartitionByList) {
|
||||
SNode* pPartKey = NULL;
|
||||
FOREACH(pPartKey, pPartitionByList) {
|
||||
if (isTbnameFuction(pPartKey)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool fromSubtable(SNode* table) {
|
||||
if (NULL == table) return false;
|
||||
if (table->type == QUERY_NODE_REAL_TABLE && ((SRealTableNode*)table)->pMeta &&
|
||||
((SRealTableNode*)table)->pMeta->tableType == TSDB_CHILD_TABLE) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
|
||||
STranslateContext* pCxt = (STranslateContext*)pContext;
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
|
||||
|
@ -2707,15 +2730,25 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
|
|||
}
|
||||
SNode* pGroupNode = NULL;
|
||||
FOREACH(pGroupNode, getGroupByList(pCxt)) {
|
||||
if (nodesEqualNode(getGroupByNode(pGroupNode), *pNode)) {
|
||||
SNode* pActualNode = getGroupByNode(pGroupNode);
|
||||
if (nodesEqualNode(pActualNode, *pNode)) {
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
if (isTbnameFuction(pActualNode) && QUERY_NODE_COLUMN == nodeType(*pNode) &&
|
||||
((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
|
||||
return rewriteExprToGroupKeyFunc(pCxt, pNode);
|
||||
}
|
||||
}
|
||||
SNode* pPartKey = NULL;
|
||||
bool partionByTbname = hasTbnameFunction(pSelect->pPartitionByList);
|
||||
FOREACH(pPartKey, pSelect->pPartitionByList) {
|
||||
if (nodesEqualNode(pPartKey, *pNode)) {
|
||||
return rewriteExprToGroupKeyFunc(pCxt, pNode);
|
||||
}
|
||||
if ((partionByTbname) && QUERY_NODE_COLUMN == nodeType(*pNode) &&
|
||||
((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
|
||||
return rewriteExprToGroupKeyFunc(pCxt, pNode);
|
||||
}
|
||||
}
|
||||
if (NULL != pSelect->pWindow && QUERY_NODE_STATE_WINDOW == nodeType(pSelect->pWindow)) {
|
||||
if (nodesEqualNode(((SStateWindowNode*)pSelect->pWindow)->pExpr, *pNode)) {
|
||||
|
@ -2779,11 +2812,19 @@ static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) {
|
|||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
SNode* pPartKey = NULL;
|
||||
bool partionByTbname = false;
|
||||
if (fromSubtable(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pFromTable) ||
|
||||
hasTbnameFunction(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pPartitionByList)) {
|
||||
partionByTbname = true;
|
||||
}
|
||||
FOREACH(pPartKey, ((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pPartitionByList) {
|
||||
if (nodesEqualNode(pPartKey, *pNode)) {
|
||||
return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode);
|
||||
}
|
||||
}
|
||||
if (partionByTbname && QUERY_NODE_COLUMN == nodeType(*pNode) && ((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
|
||||
return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode);
|
||||
}
|
||||
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
||||
pCxt->existCol = true;
|
||||
}
|
||||
|
@ -3954,22 +3995,12 @@ static int32_t checkStateExpr(STranslateContext* pCxt, SNode* pNode) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool hasPartitionByTbname(SNodeList* pPartitionByList) {
|
||||
SNode* pPartKey = NULL;
|
||||
FOREACH(pPartKey, pPartitionByList) {
|
||||
if (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
if (!pCxt->createStream) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
|
||||
!hasPartitionByTbname(pSelect->pPartitionByList)) {
|
||||
!hasTbnameFunction(pSelect->pPartitionByList)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -7559,12 +7590,12 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm
|
|||
static bool crossTableWithoutAggOper(SSelectStmt* pSelect) {
|
||||
return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc &&
|
||||
!pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
|
||||
!hasPartitionByTbname(pSelect->pPartitionByList);
|
||||
!hasTbnameFunction(pSelect->pPartitionByList);
|
||||
}
|
||||
|
||||
static bool crossTableWithUdaf(SSelectStmt* pSelect) {
|
||||
return pSelect->hasUdaf && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
|
||||
!hasPartitionByTbname(pSelect->pPartitionByList);
|
||||
!hasTbnameFunction(pSelect->pPartitionByList);
|
||||
}
|
||||
|
||||
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
||||
|
@ -7822,7 +7853,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
|
|||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
if ( (SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta
|
||||
&& TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType
|
||||
&& !hasPartitionByTbname(pSelect->pPartitionByList)
|
||||
&& !hasTbnameFunction(pSelect->pPartitionByList)
|
||||
&& pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_EVENT_WINDOW) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||
"Event window for stream on super table must patitioned by table name");
|
||||
|
@ -7850,7 +7881,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
|
|||
if (pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_COUNT_WINDOW) {
|
||||
if ( (SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta
|
||||
&& TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType
|
||||
&& !hasPartitionByTbname(pSelect->pPartitionByList) ) {
|
||||
&& !hasTbnameFunction(pSelect->pPartitionByList) ) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||
"Count window for stream on super table must patitioned by table name");
|
||||
}
|
||||
|
|
|
@ -494,6 +494,9 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
} else if (pSelect->pPartitionByList) {
|
||||
isCountByTag = !keysHasCol(pSelect->pPartitionByList);
|
||||
}
|
||||
if (pScan->tableType == TSDB_CHILD_TABLE) {
|
||||
isCountByTag = true;
|
||||
}
|
||||
}
|
||||
pScan->isCountByTag = isCountByTag;
|
||||
|
||||
|
|
|
@ -66,10 +66,10 @@ class TDTestCase:
|
|||
tdSql.query('select * from (select tbname, avg(f) from st partition by tbname) a partition by a.tbname order by a.tbname');
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkCols(2)
|
||||
tdSql.checkData(0, 0, 'ct1');
|
||||
tdSql.checkData(0, 1, 6.0);
|
||||
tdSql.checkData(1, 0, 'ct2');
|
||||
tdSql.checkData(1, 1, 12.0);
|
||||
tdSql.checkData(0, 0, 'ct1')
|
||||
tdSql.checkData(0, 1, 6.0)
|
||||
tdSql.checkData(1, 0, 'ct2')
|
||||
tdSql.checkData(1, 1, 12.0)
|
||||
|
||||
tdSql.error('select tbname from (select * from st)')
|
||||
tdSql.error('select st.tbname from (select st.tbname from st)')
|
||||
|
|
|
@ -870,7 +870,7 @@ sql_error select stddev(c2), tbname from select_tags_mt0;
|
|||
sql_error select twa(c2), tbname from select_tags_mt0;
|
||||
sql_error select interp(c2), tbname from select_tags_mt0 where ts=100001;
|
||||
|
||||
sql_error select t1,t2,tbname from select_tags_mt0 group by tbname;
|
||||
|
||||
sql select count(tbname) from select_tags_mt0 interval(1d);
|
||||
sql select count(tbname) from select_tags_mt0 group by t1;
|
||||
sql select count(tbname),SUM(T1) from select_tags_mt0 interval(1d);
|
||||
|
@ -888,16 +888,16 @@ sql_error select tbname, t1 from select_tags_mt0 interval(1y);
|
|||
print ==================================>TD-4231
|
||||
sql select t1,tbname from select_tags_mt0 where c1<0
|
||||
sql select t1,tbname from select_tags_mt0 where c1<0 and tbname in ('select_tags_tb12')
|
||||
|
||||
sql select tbname from select_tags_mt0 where tbname in ('select_tags_tb12');
|
||||
|
||||
sql_error select first(c1), last(c2), t1 from select_tags_mt0 group by tbname;
|
||||
sql_error select first(c1), last(c2), tbname, t2 from select_tags_mt0 group by tbname;
|
||||
sql_error select first(c1), count(*), t2, t1, tbname from select_tags_mt0 group by tbname;
|
||||
#valid sql: select first(c1), t2 from select_tags_mt0 group by tbname;
|
||||
sql select first(ts), tbname from select_tags_mt0 group by tbname;
|
||||
sql select count(c1) from select_tags_mt0 where c1=99 group by tbname;
|
||||
sql select count(*),tbname from select_tags_mt0 group by tbname
|
||||
|
||||
#sql select first(ts), tbname from select_tags_mt0 group by tbname;
|
||||
#sql select count(c1) from select_tags_mt0 where c1=99 group by tbname;
|
||||
#sql select count(*),tbname from select_tags_mt0 group by tbname
|
||||
print ==================================> tag supported in group
|
||||
sql select t1,t2,tbname from select_tags_mt0 group by tbname;
|
||||
sql select first(c1), last(c2), t1 from select_tags_mt0 group by tbname;
|
||||
sql select first(c1), last(c2), tbname, t2 from select_tags_mt0 group by tbname;
|
||||
sql select first(c1), count(*), t2, t1, tbname from select_tags_mt0 group by tbname;
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -3,15 +3,24 @@ system sh/deploy.sh -n dnode1 -i 1
|
|||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
sql create database test;
|
||||
sql create database test KEEP 36500;
|
||||
sql use test;
|
||||
sql create table st(ts timestamp, f int) tags(t int);
|
||||
sql insert into ct1 using st tags(1) values(now, 0)(now+1s, 1)(now+2s, 10)(now+3s, 11)
|
||||
sql insert into ct2 using st tags(2) values(now+2s, 2)(now+3s, 3)
|
||||
sql insert into ct3 using st tags(3) values(now+4s, 4)(now+5s, 5)
|
||||
sql insert into ct4 using st tags(4) values(now+6s, 6)(now+7s, 7)
|
||||
|
||||
sql select count(*), spread(ts) from st where tbname='ct1'
|
||||
$ms = 1712135244502
|
||||
$ms1 = $ms + 1000
|
||||
$ms2 = $ms + 2000
|
||||
$ms3 = $ms + 3000
|
||||
$ms4 = $ms + 4000
|
||||
$ms5 = $ms + 5000
|
||||
$ms6 = $ms + 6000
|
||||
$ms7 = $ms + 7000
|
||||
sql insert into ct1 using st tags(1) values($ms , 0)($ms1 , 1)($ms2 , 10)($ms3 , 11)
|
||||
sql insert into ct2 using st tags(2) values($ms2 , 2)($ms3 , 3)
|
||||
sql insert into ct3 using st tags(3) values($ms4 , 4)($ms5 , 5)
|
||||
sql insert into ct4 using st tags(4) values($ms6 , 6)($ms7 , 7)
|
||||
|
||||
sql select count(*), spread(ts) from st where tbname='ct1'
|
||||
print $data00, $data01
|
||||
if $data00 != @4@ then
|
||||
return -1
|
||||
|
|
|
@ -103,6 +103,10 @@ class TDTestCase:
|
|||
tdSql.checkRows(row)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by tbname')
|
||||
tdSql.checkRows(row)
|
||||
tdSql.query(f'select t0, {function_name}(c1),sum(c1) from {self.stbname} partition by tbname')
|
||||
tdSql.checkRows(row)
|
||||
tdSql.query(f'select cast(t0 as binary(12)), {function_name}(c1),sum(c1) from {self.stbname} partition by tbname')
|
||||
tdSql.checkRows(row)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by c1')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by t0')
|
||||
|
|
|
@ -470,7 +470,9 @@ class TDTestCase:
|
|||
tdSql.checkRows(40)
|
||||
|
||||
# bug need fix
|
||||
tdSql.query("select tbname , csum(c1), csum(c12) from db.stb1 partition by tbname")
|
||||
tdSql.query("select tbname , st1, csum(c1), csum(c12) from db.stb1 partition by tbname")
|
||||
tdSql.checkRows(40)
|
||||
tdSql.query("select tbname , cast(st1 as binary(24)), csum(c1), csum(c12) from db.stb1 partition by tbname")
|
||||
tdSql.checkRows(40)
|
||||
tdSql.query("select tbname , csum(st1) from db.stb1 partition by tbname")
|
||||
tdSql.checkRows(70)
|
||||
|
|
|
@ -91,15 +91,71 @@ class TDTestCase:
|
|||
tdSql.query(f"select t2, t3, c1, count(*) from {self.dbname}.{self.stable} {keyword} by t2, t3, c1 ")
|
||||
tdSql.checkRows(nonempty_tb_num * self.row_nums)
|
||||
|
||||
def test_groupby_sub_table(self):
|
||||
for i in range(self.tb_nums):
|
||||
tbname = f"{self.dbname}.sub_{self.stable}_{i}"
|
||||
ts = self.ts + i*10000
|
||||
tdSql.query(f"select t1, t2, t3,count(*) from {tbname}")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, i)
|
||||
tdSql.checkData(0, 2, i*10)
|
||||
|
||||
tdSql.query(f"select cast(t2 as binary(12)),count(*) from {tbname}")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, i)
|
||||
|
||||
tdSql.query(f"select t2 + 1, count(*) from {tbname}")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, i + 1)
|
||||
|
||||
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} group by tbname")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, i)
|
||||
tdSql.checkData(0, 2, i*10)
|
||||
|
||||
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} group by tbname, c1, t4")
|
||||
tdSql.checkData(0, 1, i)
|
||||
tdSql.checkData(0, 2, i*10)
|
||||
|
||||
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} partition by tbname")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, i)
|
||||
tdSql.checkData(0, 2, i*10)
|
||||
|
||||
tdSql.query(f"select t1, t2, t3, count(*) from {tbname} partition by c1, tbname")
|
||||
tdSql.checkData(0, 1, i)
|
||||
tdSql.checkData(0, 2, i*10)
|
||||
|
||||
tdSql.query(f"select t1, t2, t3, count(*) from {self.dbname}.{self.stable} partition by c1, tbname order by tbname desc")
|
||||
tdSql.checkRows(50)
|
||||
tdSql.checkData(0, 1, 4)
|
||||
tdSql.checkData(0, 2, 40)
|
||||
|
||||
|
||||
def test_multi_group_key(self, check_num, nonempty_tb_num):
|
||||
# multi tag/tbname
|
||||
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} group by t2, t3, tbname")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
tdSql.query(f"select cast(t2 as binary(12)), count(*) from {self.dbname}.{self.stable} group by t2, t3, tbname")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} partition by t2, t3, tbname")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} group by tbname order by tbname asc")
|
||||
tdSql.checkRows(check_num)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
tdSql.checkData(1, 0, 1)
|
||||
tdSql.checkData(2, 1, 20)
|
||||
tdSql.checkData(3, 1, 30)
|
||||
|
||||
tdSql.query(f"select t2, t3, tbname, count(*) from {self.dbname}.{self.stable} partition by tbname order by tbname asc")
|
||||
tdSql.checkRows(check_num)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
tdSql.checkData(2, 1, 20)
|
||||
tdSql.checkData(3, 1, 30)
|
||||
|
||||
# multi tag + col
|
||||
tdSql.query(f"select t1, t2, c1, count(*) from {self.dbname}.{self.stable} partition by t1, t2, c1 ")
|
||||
tdSql.checkRows(nonempty_tb_num * self.row_nums)
|
||||
|
@ -222,12 +278,14 @@ class TDTestCase:
|
|||
|
||||
self.test_groupby('group', self.tb_nums, nonempty_tb_num)
|
||||
self.test_groupby('partition', self.tb_nums, nonempty_tb_num)
|
||||
self.test_groupby_sub_table()
|
||||
self.test_innerSelect(self.tb_nums)
|
||||
self.test_multi_group_key(self.tb_nums, nonempty_tb_num)
|
||||
self.test_multi_agg(self.tb_nums, nonempty_tb_num)
|
||||
self.test_window(nonempty_tb_num)
|
||||
self.test_event_window(nonempty_tb_num)
|
||||
|
||||
|
||||
## test old version before changed
|
||||
# self.test_groupby('group', 0, 0)
|
||||
# self.insert_db(5, self.row_nums)
|
||||
|
|
Loading…
Reference in New Issue