diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 6cb83ebb51..8e375e9da8 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -156,6 +156,7 @@ typedef struct SProjectLogicNode { SNodeList* pProjections; char stmtName[TSDB_TABLE_NAME_LEN]; bool ignoreGroupId; + bool inputIgnoreGroup; } SProjectLogicNode; typedef struct SIndefRowsFuncLogicNode { @@ -449,6 +450,7 @@ typedef struct SProjectPhysiNode { SNodeList* pProjections; bool mergeDataBlock; bool ignoreGroupId; + bool inputIgnoreGroup; } SProjectPhysiNode; typedef struct SIndefRowsFuncPhysiNode { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index d965f16862..cc83ecd84e 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -27,6 +27,7 @@ typedef struct SProjectOperatorInfo { SLimitInfo limitInfo; bool mergeDataBlocks; SSDataBlock* pFinalRes; + bool inputIgnoreGroup; } SProjectOperatorInfo; typedef struct SIndefOperatorInfo { @@ -109,7 +110,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->pFinalRes = createOneDataBlock(pResBlock, false); pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder; - + pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup; + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { pInfo->mergeDataBlocks = false; } else { @@ -300,6 +302,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { return pBlock; } + if (pProjectInfo->inputIgnoreGroup) { + pBlock->info.id.groupId = 0; + } + int32_t status = discardGroupDataBlock(pBlock, pLimitInfo); if (status == PROJECT_RETRIEVE_CONTINUE) { continue; diff --git a/source/libs/function/inc/fnLog.h b/source/libs/function/inc/fnLog.h index 9658b6b782..150d145f50 100644 --- a/source/libs/function/inc/fnLog.h +++ b/source/libs/function/inc/fnLog.h @@ -1,6 +1,17 @@ -// -// Created by slzhou on 22-4-20. -// +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ #ifndef TDENGINE_FNLOG_H #define TDENGINE_FNLOG_H diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index c090cb4b63..bc9839792c 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -486,6 +486,7 @@ static int32_t logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode CLONE_NODE_LIST_FIELD(pProjections); COPY_CHAR_ARRAY_FIELD(stmtName); COPY_SCALAR_FIELD(ignoreGroupId); + COPY_SCALAR_FIELD(inputIgnoreGroup); return TSDB_CODE_SUCCESS; } @@ -728,6 +729,15 @@ static int32_t physiPartitionCopy(const SPartitionPhysiNode* pSrc, SPartitionPhy return TSDB_CODE_SUCCESS; } +static int32_t physiProjectCopy(const SProjectPhysiNode* pSrc, SProjectPhysiNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, physiNodeCopy); + CLONE_NODE_LIST_FIELD(pProjections); + COPY_SCALAR_FIELD(mergeDataBlock); + COPY_SCALAR_FIELD(ignoreGroupId); + COPY_SCALAR_FIELD(inputIgnoreGroup); + return TSDB_CODE_SUCCESS; +} + static int32_t dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) { COPY_SCALAR_FIELD(dataBlockId); CLONE_NODE_LIST_FIELD(pSlots); @@ -959,6 +969,9 @@ SNode* nodesCloneNode(const SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: code = physiPartitionCopy((const SPartitionPhysiNode*)pNode, (SPartitionPhysiNode*)pDst); break; + case QUERY_NODE_PHYSICAL_PLAN_PROJECT: + code = physiProjectCopy((const SProjectPhysiNode*)pNode, (SProjectPhysiNode*)pDst); + break; default: break; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 6696aab4c1..d36a1bd6b9 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -784,6 +784,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { static const char* jkProjectLogicPlanProjections = "Projections"; static const char* jkProjectLogicPlanIgnoreGroupId = "IgnoreGroupId"; +static const char* jkProjectLogicPlanInputIgnoreGroup= "InputIgnoreGroup"; static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) { const SProjectLogicNode* pNode = (const SProjectLogicNode*)pObj; @@ -795,6 +796,9 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkProjectLogicPlanIgnoreGroupId, pNode->ignoreGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkProjectLogicPlanInputIgnoreGroup, pNode->inputIgnoreGroup); + } return code; } @@ -809,7 +813,9 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkProjectLogicPlanIgnoreGroupId, &pNode->ignoreGroupId); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkProjectLogicPlanInputIgnoreGroup, &pNode->inputIgnoreGroup); + } return code; } @@ -2067,6 +2073,7 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) { static const char* jkProjectPhysiPlanProjections = "Projections"; static const char* jkProjectPhysiPlanMergeDataBlock = "MergeDataBlock"; static const char* jkProjectPhysiPlanIgnoreGroupId = "IgnoreGroupId"; +static const char* jkProjectPhysiPlanInputIgnoreGroup = "InputIgnoreGroup"; static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) { const SProjectPhysiNode* pNode = (const SProjectPhysiNode*)pObj; @@ -2081,7 +2088,9 @@ static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkProjectPhysiPlanIgnoreGroupId, pNode->ignoreGroupId); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkProjectPhysiPlanInputIgnoreGroup, pNode->inputIgnoreGroup); + } return code; } @@ -2098,7 +2107,9 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkProjectPhysiPlanIgnoreGroupId, &pNode->ignoreGroupId); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkProjectPhysiPlanInputIgnoreGroup, &pNode->inputIgnoreGroup); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 3cff7c76aa..3fd219b8d8 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2367,7 +2367,8 @@ enum { PHY_PROJECT_CODE_BASE_NODE = 1, PHY_PROJECT_CODE_PROJECTIONS, PHY_PROJECT_CODE_MERGE_DATA_BLOCK, - PHY_PROJECT_CODE_IGNORE_GROUP_ID + PHY_PROJECT_CODE_IGNORE_GROUP_ID, + PHY_PROJECT_CODE_INPUT_IGNORE_GROUP }; static int32_t physiProjectNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2383,6 +2384,9 @@ static int32_t physiProjectNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_PROJECT_CODE_IGNORE_GROUP_ID, pNode->ignoreGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_PROJECT_CODE_INPUT_IGNORE_GROUP, pNode->inputIgnoreGroup); + } return code; } @@ -2406,6 +2410,9 @@ static int32_t msgToPhysiProjectNode(STlvDecoder* pDecoder, void* pObj) { case PHY_PROJECT_CODE_IGNORE_GROUP_ID: code = tlvDecodeBool(pTlv, &pNode->ignoreGroupId); break; + case PHY_PROJECT_CODE_INPUT_IGNORE_GROUP: + code = tlvDecodeBool(pTlv, &pNode->inputIgnoreGroup); + break; default: break; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 754d502a33..dfb9343cc8 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3122,6 +3122,7 @@ static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) { NULL != pChild->pConditions || NULL != pChild->pLimit || NULL != pChild->pSlimit) { return false; } + return true; } @@ -3160,7 +3161,9 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) { static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) { SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0); - + if (((SProjectLogicNode*)pChild)->ignoreGroupId) { + ((SProjectLogicNode*)pSelfNode)->inputIgnoreGroup = true; + } SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS}; nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt); int32_t code = cxt.errCode; @@ -3325,7 +3328,11 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu } case QUERY_NODE_LOGIC_PLAN_SCAN: if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && pNodeWithLimit->pLimit) { - swapLimit(pNodeWithLimit, pNodeLimitPushTo); + if (((SProjectLogicNode*)pNodeWithLimit)->inputIgnoreGroup) { + cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT); + } else { + swapLimit(pNodeWithLimit, pNodeLimitPushTo); + } return true; } default: diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 21c698fda5..21c637116f 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1459,6 +1459,7 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild pProject->mergeDataBlock = projectCanMergeDataBlock(pProjectLogicNode); pProject->ignoreGroupId = pProjectLogicNode->ignoreGroupId; + pProject->inputIgnoreGroup = pProjectLogicNode->inputIgnoreGroup; int32_t code = TSDB_CODE_SUCCESS; if (0 == LIST_LENGTH(pChildren)) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index ad0031f815..f2258a1d7e 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1164,8 +1164,10 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) static int32_t stbSplGetSplitNodeForScan(SStableSplitInfo* pInfo, SLogicNode** pSplitNode) { *pSplitNode = pInfo->pSplitNode; - if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) && - NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) { + if (NULL != pInfo->pSplitNode->pParent && + QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) && + NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit && + !((SProjectLogicNode*)pInfo->pSplitNode->pParent)->inputIgnoreGroup) { *pSplitNode = pInfo->pSplitNode->pParent; if (NULL != pInfo->pSplitNode->pLimit) { (*pSplitNode)->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 1c2f5ec23c..694af127af 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -36,6 +36,7 @@ #,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4 ,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/project_group.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/compact-col.py diff --git a/tests/system-test/2-query/project_group.py b/tests/system-test/2-query/project_group.py new file mode 100644 index 0000000000..44943e5088 --- /dev/null +++ b/tests/system-test/2-query/project_group.py @@ -0,0 +1,65 @@ +from wsgiref.headers import tspecials +from util.log import * +from util.cases import * +from util.sql import * +import numpy as np + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + self.rowNum = 10 + self.batchNum = 5 + self.ts = 1537146000000 + + def run(self): + dbname = "db" + tdSql.prepare() + + tdSql.execute(f'''create table sta(ts timestamp, col1 int, col2 bigint) tags(tg1 int, tg2 binary(20))''') + tdSql.execute(f"create table sta1 using sta tags(1, 'a')") + tdSql.execute(f"create table sta2 using sta tags(2, 'b')") + tdSql.execute(f"create table sta3 using sta tags(3, 'c')") + tdSql.execute(f"create table sta4 using sta tags(4, 'a')") + tdSql.execute(f"insert into sta1 values(1537146000001, 11, 110)") + tdSql.execute(f"insert into sta1 values(1537146000002, 12, 120)") + tdSql.execute(f"insert into sta1 values(1537146000003, 13, 130)") + tdSql.execute(f"insert into sta2 values(1537146000001, 21, 210)") + tdSql.execute(f"insert into sta2 values(1537146000002, 22, 220)") + tdSql.execute(f"insert into sta2 values(1537146000003, 23, 230)") + tdSql.execute(f"insert into sta3 values(1537146000001, 31, 310)") + tdSql.execute(f"insert into sta3 values(1537146000002, 32, 320)") + tdSql.execute(f"insert into sta3 values(1537146000003, 33, 330)") + tdSql.execute(f"insert into sta4 values(1537146000001, 41, 410)") + tdSql.execute(f"insert into sta4 values(1537146000002, 42, 420)") + tdSql.execute(f"insert into sta4 values(1537146000003, 43, 430)") + + tdSql.execute(f'''create table stb(ts timestamp, col1 int, col2 bigint) tags(tg1 int, tg2 binary(20))''') + tdSql.execute(f"create table stb1 using stb tags(1, 'a')") + tdSql.execute(f"create table stb2 using stb tags(2, 'b')") + tdSql.execute(f"create table stb3 using stb tags(3, 'c')") + tdSql.execute(f"create table stb4 using stb tags(4, 'a')") + tdSql.execute(f"insert into stb1 values(1537146000001, 911, 9110)") + tdSql.execute(f"insert into stb1 values(1537146000002, 912, 9120)") + tdSql.execute(f"insert into stb1 values(1537146000003, 913, 9130)") + tdSql.execute(f"insert into stb2 values(1537146000001, 921, 9210)") + tdSql.execute(f"insert into stb2 values(1537146000002, 922, 9220)") + tdSql.execute(f"insert into stb2 values(1537146000003, 923, 9230)") + tdSql.execute(f"insert into stb3 values(1537146000001, 931, 9310)") + tdSql.execute(f"insert into stb3 values(1537146000002, 932, 9320)") + tdSql.execute(f"insert into stb3 values(1537146000003, 933, 9330)") + tdSql.execute(f"insert into stb4 values(1537146000001, 941, 9410)") + tdSql.execute(f"insert into stb4 values(1537146000002, 942, 9420)") + tdSql.execute(f"insert into stb4 values(1537146000003, 943, 9430)") + + tdSql.query("select * from (select ts, col1 from sta partition by tbname) limit 2"); + tdSql.checkRows(2) + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())