Merge pull request #19224 from taosdata/fix/TD-21561
fix(query): fix count/hyperloglog return additional row in group by when queryPolicy is set to 3.
This commit is contained in:
commit
4a142018da
|
@ -121,6 +121,7 @@ typedef struct SAggLogicNode {
|
|||
bool hasLast;
|
||||
bool hasTimeLineFunc;
|
||||
bool onlyHasKeepOrderFunc;
|
||||
bool hasGroupKeyOptimized;
|
||||
} SAggLogicNode;
|
||||
|
||||
typedef struct SProjectLogicNode {
|
||||
|
@ -402,6 +403,7 @@ typedef struct SAggPhysiNode {
|
|||
SNodeList* pGroupKeys;
|
||||
SNodeList* pAggFuncs;
|
||||
bool mergeDataBlock;
|
||||
bool groupKeyOptimized;
|
||||
} SAggPhysiNode;
|
||||
|
||||
typedef struct SDownstreamSourceNode {
|
||||
|
|
|
@ -83,6 +83,7 @@ typedef struct SAggOperatorInfo {
|
|||
uint64_t groupId;
|
||||
SGroupResInfo groupResInfo;
|
||||
SExprSupp scalarExprSup;
|
||||
bool groupKeyOptimized;
|
||||
} SAggOperatorInfo;
|
||||
|
||||
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
|
||||
|
@ -1354,6 +1355,11 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||
if (pAggInfo->groupKeyOptimized) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
|
||||
(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
|
||||
|
@ -1755,6 +1761,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
|
|||
}
|
||||
|
||||
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
|
||||
pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
|
||||
pInfo->groupId = UINT64_MAX;
|
||||
|
||||
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
|
||||
|
|
|
@ -540,7 +540,7 @@ int32_t countFunction(SqlFunctionCtx* pCtx) {
|
|||
if (IS_NULL_TYPE(type)) {
|
||||
// select count(NULL) returns 0
|
||||
numOfElem = 1;
|
||||
*((int64_t*)buf) = 0;
|
||||
*((int64_t*)buf) += 0;
|
||||
} else {
|
||||
numOfElem = getNumOfElems(pCtx);
|
||||
*((int64_t*)buf) += numOfElem;
|
||||
|
|
|
@ -402,6 +402,7 @@ static int32_t logicAggCopy(const SAggLogicNode* pSrc, SAggLogicNode* pDst) {
|
|||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||
CLONE_NODE_LIST_FIELD(pGroupKeys);
|
||||
CLONE_NODE_LIST_FIELD(pAggFuncs);
|
||||
COPY_SCALAR_FIELD(hasGroupKeyOptimized);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1829,6 +1829,7 @@ static const char* jkAggPhysiPlanExprs = "Exprs";
|
|||
static const char* jkAggPhysiPlanGroupKeys = "GroupKeys";
|
||||
static const char* jkAggPhysiPlanAggFuncs = "AggFuncs";
|
||||
static const char* jkAggPhysiPlanMergeDataBlock = "MergeDataBlock";
|
||||
static const char* jkAggPhysiPlanGroupKeyOptimized = "GroupKeyOptimized";
|
||||
|
||||
static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SAggPhysiNode* pNode = (const SAggPhysiNode*)pObj;
|
||||
|
@ -1846,6 +1847,9 @@ static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkAggPhysiPlanMergeDataBlock, pNode->mergeDataBlock);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkAggPhysiPlanGroupKeyOptimized, pNode->groupKeyOptimized);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1866,6 +1870,9 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkAggPhysiPlanMergeDataBlock, &pNode->mergeDataBlock);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkAggPhysiPlanGroupKeyOptimized, &pNode->groupKeyOptimized);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2370,7 +2370,8 @@ enum {
|
|||
PHY_AGG_CODE_EXPR,
|
||||
PHY_AGG_CODE_GROUP_KEYS,
|
||||
PHY_AGG_CODE_AGG_FUNCS,
|
||||
PHY_AGG_CODE_MERGE_DATA_BLOCK
|
||||
PHY_AGG_CODE_MERGE_DATA_BLOCK,
|
||||
PHY_AGG_CODE_GROUP_KEY_OPTIMIZE
|
||||
};
|
||||
|
||||
static int32_t physiAggNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
|
@ -2389,6 +2390,9 @@ static int32_t physiAggNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_AGG_CODE_MERGE_DATA_BLOCK, pNode->mergeDataBlock);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_AGG_CODE_GROUP_KEY_OPTIMIZE, pNode->groupKeyOptimized);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2415,6 +2419,9 @@ static int32_t msgToPhysiAggNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_AGG_CODE_MERGE_DATA_BLOCK:
|
||||
code = tlvDecodeBool(pTlv, &pNode->mergeDataBlock);
|
||||
break;
|
||||
case PHY_AGG_CODE_GROUP_KEY_OPTIMIZE:
|
||||
code = tlvDecodeBool(pTlv, &pNode->groupKeyOptimized);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -545,6 +545,7 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
|||
pAgg->hasLastRow = pSelect->hasLastRowFunc;
|
||||
pAgg->hasLast = pSelect->hasLastFunc;
|
||||
pAgg->hasTimeLineFunc = pSelect->hasTimeLineFunc;
|
||||
pAgg->hasGroupKeyOptimized = false;
|
||||
pAgg->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc;
|
||||
pAgg->node.groupAction = getGroupAction(pCxt, pSelect);
|
||||
pAgg->node.requireDataOrder = getRequireDataOrder(pAgg->hasTimeLineFunc, pSelect);
|
||||
|
|
|
@ -1544,6 +1544,11 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
code = adjustLogicNodeDataRequirement((SLogicNode*)pScan, pNode->resultDataOrder);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_AGG == pNode->pParent->type) {
|
||||
SAggLogicNode* pParent = (SAggLogicNode*)(pNode->pParent);
|
||||
pParent->hasGroupKeyOptimized = true;
|
||||
}
|
||||
|
||||
NODES_CLEAR_LIST(pNode->pChildren);
|
||||
nodesDestroyNode((SNode*)pNode);
|
||||
}
|
||||
|
@ -1569,6 +1574,8 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
break;
|
||||
}
|
||||
}
|
||||
pAgg->hasGroupKeyOptimized = true;
|
||||
|
||||
NODES_DESTORY_LIST(pAgg->pGroupKeys);
|
||||
if (TSDB_CODE_SUCCESS == code && start >= 0) {
|
||||
code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg);
|
||||
|
@ -1577,6 +1584,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = partTagsOptRebuildTbanme(pScan->pGroupTags);
|
||||
}
|
||||
|
||||
pCxt->optimized = true;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -872,6 +872,7 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
|
|||
}
|
||||
|
||||
pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true);
|
||||
pAgg->groupKeyOptimized = pAggLogicNode->hasGroupKeyOptimized;
|
||||
|
||||
SNodeList* pPrecalcExprs = NULL;
|
||||
SNodeList* pGroupKeys = NULL;
|
||||
|
|
|
@ -732,6 +732,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-19201.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3
|
||||
|
@ -830,6 +831,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py -Q 3
|
||||
|
@ -925,6 +927,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py -Q 4
|
||||
|
@ -1030,6 +1033,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py
|
||||
|
||||
#develop test
|
||||
|
|
|
@ -0,0 +1,148 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import random
|
||||
import os
|
||||
import time
|
||||
import taos
|
||||
import subprocess
|
||||
from faker import Faker
|
||||
from util.log import tdLog
|
||||
from util.cases import tdCases
|
||||
from util.sql import tdSql
|
||||
from util.dnodes import tdDnodes
|
||||
from util.dnodes import *
|
||||
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'maxSQLLength':1048576,'debugFlag': 143 ,"querySmaOptimize":1}
|
||||
|
||||
def init(self, conn, logSql, replicaVar):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
self.testcasePath = os.path.split(__file__)[0]
|
||||
self.testcaseFilename = os.path.split(__file__)[-1]
|
||||
os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
|
||||
|
||||
self.db = "hyp"
|
||||
|
||||
def dropandcreateDB_random(self,database,n):
|
||||
ts = 1630000000000
|
||||
num_random = 10
|
||||
fake = Faker('zh_CN')
|
||||
tdSql.execute('''drop database if exists %s ;''' %database)
|
||||
tdSql.execute('''create database %s keep 36500 ;'''%(database))
|
||||
tdSql.execute('''use %s;'''%database)
|
||||
|
||||
tdSql.execute('''create stable %s.stable_1 (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , \
|
||||
q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) \
|
||||
tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);'''%database)
|
||||
tdSql.execute('''create stable %s.stable_2 (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , \
|
||||
q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) \
|
||||
tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);'''%database)
|
||||
|
||||
for i in range(num_random):
|
||||
tdSql.execute('''create table %s.table_%d \
|
||||
(ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp ) ;'''%(database,i))
|
||||
tdSql.execute('''create table %s.stable_1_%d using %s.stable_1 tags('stable_1_%d', '%d' , '%d', '%d' , '%d' , 1 , 'binary1.%s' , 'nchar1.%s' , '%f', '%f' ,'%d') ;'''
|
||||
%(database,i,database,i,fake.random_int(min=-2147483647, max=2147483647, step=1), fake.random_int(min=-9223372036854775807, max=9223372036854775807, step=1),
|
||||
fake.random_int(min=-32767, max=32767, step=1) , fake.random_int(min=-127, max=127, step=1) ,
|
||||
fake.pystr() ,fake.pystr() ,fake.pyfloat(),fake.pyfloat(),fake.random_int(min=-2147483647, max=2147483647, step=1)))
|
||||
|
||||
# insert data
|
||||
for i in range(num_random):
|
||||
for j in range(n):
|
||||
tdSql.execute('''insert into %s.stable_1_%d (ts , q_int , q_bigint , q_smallint , q_tinyint , q_float , q_double, q_bool , q_binary , q_nchar, q_ts)\
|
||||
values(%d, %d, %d, %d, %d, %f, %f, 0, 'binary.%s', 'nchar.%s', %d) ;'''
|
||||
% (database,i,ts + i*1000 + j, fake.random_int(min=-2147483647, max=2147483647, step=1),
|
||||
fake.random_int(min=-9223372036854775807, max=9223372036854775807, step=1),
|
||||
fake.random_int(min=-32767, max=32767, step=1) , fake.random_int(min=-127, max=127, step=1) ,
|
||||
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.pystr() , ts + i))
|
||||
|
||||
tdSql.execute('''insert into %s.table_%d (ts , q_int , q_bigint , q_smallint , q_tinyint , q_float , q_double , q_bool , q_binary , q_nchar, q_ts) \
|
||||
values(%d, %d, %d, %d, %d, %f, %f, 0, 'binary.%s', 'nchar.%s', %d) ;'''
|
||||
% (database,i,ts + i*1000 + j, fake.random_int(min=-2147483647, max=2147483647, step=1),
|
||||
fake.random_int(min=-9223372036854775807, max=9223372036854775807, step=1),
|
||||
fake.random_int(min=-32767, max=32767, step=1) , fake.random_int(min=-127, max=127, step=1) ,
|
||||
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.pystr() , ts + i ))
|
||||
|
||||
tdSql.query("select count(*) from %s.stable_1;" %database)
|
||||
tdSql.checkData(0,0,num_random*n)
|
||||
tdSql.query("select count(*) from %s.table_0;"%database)
|
||||
tdSql.checkData(0,0,n)
|
||||
|
||||
|
||||
|
||||
def sqls(self,database,function):
|
||||
sql0 = f"select {function}(q_tinyint) from {database}.stable_1 where tbname in ('stable_1_1') group by tbname ;"
|
||||
|
||||
sql1 = f"select {function}(q_tinyint) from {database}.stable_1 where tbname in ('stable_1_1') group by tbname;"
|
||||
|
||||
sql2 = f"select {function}(q_tinyint) from {database}.stable_1 where tbname in ('stable_1_1') and _C0 is not null and _C0 between 1600000000000 and now +1h and (q_nchar like 'nchar%' or q_binary = '0' or q_nchar = 'nchar_' ) and q_bool in (0 , 1) group by tbname ;"
|
||||
|
||||
sql3 = f"select * from (select {function}(q_tinyint) from {database}.stable_1 where tbname in ('stable_1_1') and _C0 is not null and _C0 between 1600000000000 and now +1h and (q_nchar like 'nchar%' or q_binary = '0' or q_nchar = 'nchar_' ) and q_bool in (0 , 1) group by tbname) ;"
|
||||
|
||||
self.constant_check(sql1,sql0)
|
||||
self.constant_check(sql1,sql2)
|
||||
self.constant_check(sql2,sql3)
|
||||
|
||||
def check_flushdb(self,database):
|
||||
functions = ['COUNT', 'HYPERLOGLOG']
|
||||
for f in functions:
|
||||
for i in range(20):
|
||||
self.sqls(database, f);
|
||||
|
||||
tdSql.execute(" flush database %s;" %database)
|
||||
tdLog.info("flush database success")
|
||||
|
||||
for i in range(20):
|
||||
self.sqls(database, f);
|
||||
|
||||
|
||||
def constant_check(self,sql1,sql2):
|
||||
tdLog.info("\n=============sql1:(%s)___sql2:(%s) ====================\n" %(sql1,sql2))
|
||||
tdSql.query(sql1)
|
||||
sql1_value = tdSql.getData(0,0)
|
||||
tdSql.query("reset query cache")
|
||||
tdSql.query(sql2)
|
||||
sql2_value = tdSql.getData(0,0)
|
||||
self.value_check(sql1_value,sql2_value)
|
||||
|
||||
def value_check(self,base_value,check_value):
|
||||
if base_value==check_value:
|
||||
tdLog.info(f"checkEqual success, base_value={base_value},check_value={check_value}")
|
||||
else :
|
||||
tdLog.exit(f"checkEqual error, base_value=={base_value},check_value={check_value}")
|
||||
|
||||
def run(self):
|
||||
|
||||
startTime = time.time()
|
||||
|
||||
os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
|
||||
|
||||
self.dropandcreateDB_random("%s" %self.db, 100)
|
||||
|
||||
self.check_flushdb("%s" %self.db)
|
||||
|
||||
endTime = time.time()
|
||||
print("total time %ds" % (endTime - startTime))
|
||||
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue