Merge branch 'd3N' into auth
This commit is contained in:
commit
b7cb019d69
|
@ -85,13 +85,19 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t rewriteExpr(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
|
static int32_t rewriteExprForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
|
||||||
nodesWalkExprs(pExprs, doNameExpr, NULL);
|
nodesWalkExprs(pExprs, doNameExpr, NULL);
|
||||||
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
|
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
|
||||||
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
|
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
|
||||||
return cxt.errCode;
|
return cxt.errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t rewriteExprs(SNodeList* pExprs, SNodeList* pTarget) {
|
||||||
|
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
|
||||||
|
nodesRewriteExprs(pTarget, doRewriteExpr, &cxt);
|
||||||
|
return cxt.errCode;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t pushLogicNode(SLogicPlanContext* pCxt, SLogicNode** pOldRoot, SLogicNode* pNewRoot) {
|
static int32_t pushLogicNode(SLogicPlanContext* pCxt, SLogicNode** pOldRoot, SLogicNode* pNewRoot) {
|
||||||
if (NULL == pNewRoot->pChildren) {
|
if (NULL == pNewRoot->pChildren) {
|
||||||
pNewRoot->pChildren = nodesMakeList();
|
pNewRoot->pChildren = nodesMakeList();
|
||||||
|
@ -433,10 +439,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
||||||
|
|
||||||
// rewrite the expression in subsequent clauses
|
// rewrite the expression in subsequent clauses
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
|
code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = rewriteExpr(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
|
code = rewriteExprForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) {
|
||||||
|
@ -472,7 +478,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
|
code = rewriteExprForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -723,7 +729,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
|
||||||
|
|
||||||
// rewrite the expression in subsequent clauses
|
// rewrite the expression in subsequent clauses
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT);
|
code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT);
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the output
|
// set the output
|
||||||
|
@ -850,6 +856,37 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
|
||||||
|
SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG);
|
||||||
|
if (NULL == pAgg) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
pAgg->pGroupKeys = nodesCloneList(pSetOperator->pProjectionList);
|
||||||
|
if (NULL == pAgg->pGroupKeys) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
// rewrite the expression in subsequent clauses
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = rewriteExprs(pAgg->pGroupKeys, pSetOperator->pOrderByList);
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the output
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
*pLogicNode = (SLogicNode*)pAgg;
|
||||||
|
} else {
|
||||||
|
nodesDestroyNode(pAgg);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
|
static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
|
||||||
SLogicNode* pSetOp = NULL;
|
SLogicNode* pSetOp = NULL;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -857,6 +894,9 @@ static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetO
|
||||||
case SET_OP_TYPE_UNION_ALL:
|
case SET_OP_TYPE_UNION_ALL:
|
||||||
code = createSetOpProjectLogicNode(pCxt, pSetOperator, &pSetOp);
|
code = createSetOpProjectLogicNode(pCxt, pSetOperator, &pSetOp);
|
||||||
break;
|
break;
|
||||||
|
case SET_OP_TYPE_UNION:
|
||||||
|
code = createSetOpAggLogicNode(pCxt, pSetOperator, &pSetOp);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
code = -1;
|
code = -1;
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -50,6 +50,11 @@ typedef struct SUaInfo {
|
||||||
SLogicSubplan* pSubplan;
|
SLogicSubplan* pSubplan;
|
||||||
} SUaInfo;
|
} SUaInfo;
|
||||||
|
|
||||||
|
typedef struct SUnInfo {
|
||||||
|
SAggLogicNode* pAgg;
|
||||||
|
SLogicSubplan* pSubplan;
|
||||||
|
} SUnInfo;
|
||||||
|
|
||||||
typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo);
|
typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo);
|
||||||
|
|
||||||
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) {
|
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) {
|
||||||
|
@ -226,7 +231,6 @@ static SLogicSubplan* uaCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
|
||||||
pSubplan->id.groupId = pCxt->groupId;
|
pSubplan->id.groupId = pCxt->groupId;
|
||||||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||||
pSubplan->pNode = pNode;
|
pSubplan->pNode = pNode;
|
||||||
// TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*);
|
|
||||||
return pSubplan;
|
return pSubplan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,24 +248,22 @@ static int32_t uaCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan
|
||||||
|
|
||||||
pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||||
|
|
||||||
return nodesListMakeAppend(&pProject->node.pChildren, (SNode*)pExchange);
|
if (NULL == pProject->node.pParent) {
|
||||||
|
pSubplan->pNode = (SLogicNode*)pExchange;
|
||||||
|
nodesDestroyNode(pProject);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
// if (NULL == pProject->node.pParent) {
|
SNode* pNode;
|
||||||
// pSubplan->pNode = (SLogicNode*)pExchange;
|
FOREACH(pNode, pProject->node.pParent->pChildren) {
|
||||||
// nodesDestroyNode(pProject);
|
if (nodesEqualNode(pNode, pProject)) {
|
||||||
// return TSDB_CODE_SUCCESS;
|
REPLACE_NODE(pExchange);
|
||||||
// }
|
nodesDestroyNode(pNode);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
// SNode* pNode;
|
}
|
||||||
// FOREACH(pNode, pProject->node.pParent->pChildren) {
|
}
|
||||||
// if (nodesEqualNode(pNode, pProject)) {
|
nodesDestroyNode(pExchange);
|
||||||
// REPLACE_NODE(pExchange);
|
return TSDB_CODE_FAILED;
|
||||||
// nodesDestroyNode(pNode);
|
|
||||||
// return TSDB_CODE_SUCCESS;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// nodesDestroyNode(pExchange);
|
|
||||||
// return TSDB_CODE_FAILED;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||||
|
@ -291,10 +293,78 @@ static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SLogicNode* unMatchByNode(SLogicNode* pNode) {
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
|
||||||
|
return pNode;
|
||||||
|
}
|
||||||
|
SNode* pChild;
|
||||||
|
FOREACH(pChild, pNode->pChildren) {
|
||||||
|
SLogicNode* pSplitNode = uaMatchByNode((SLogicNode*)pChild);
|
||||||
|
if (NULL != pSplitNode) {
|
||||||
|
return pSplitNode;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t unCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
|
||||||
|
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
|
||||||
|
if (NULL == pExchange) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
pExchange->srcGroupId = pCxt->groupId;
|
||||||
|
// pExchange->precision = pScan->pMeta->tableInfo.precision;
|
||||||
|
pExchange->node.pTargets = nodesCloneList(pAgg->node.pTargets);
|
||||||
|
if (NULL == pExchange->node.pTargets) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||||
|
|
||||||
|
return nodesListMakeAppend(&pAgg->node.pChildren, pExchange);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool unFindSplitNode(SLogicSubplan* pSubplan, SUnInfo* pInfo) {
|
||||||
|
SLogicNode* pSplitNode = unMatchByNode(pSubplan->pNode);
|
||||||
|
if (NULL != pSplitNode) {
|
||||||
|
pInfo->pAgg = (SAggLogicNode*)pSplitNode;
|
||||||
|
pInfo->pSubplan = pSubplan;
|
||||||
|
}
|
||||||
|
return NULL != pSplitNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t unSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||||
|
SUnInfo info = {0};
|
||||||
|
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unFindSplitNode, &info)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
SNode* pChild = NULL;
|
||||||
|
FOREACH(pChild, info.pAgg->node.pChildren) {
|
||||||
|
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, uaCreateSubplan(pCxt, (SLogicNode*)pChild));
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
REPLACE_NODE(NULL);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
nodesClearList(info.pAgg->node.pChildren);
|
||||||
|
info.pAgg->node.pChildren = NULL;
|
||||||
|
code = unCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
|
||||||
|
}
|
||||||
|
++(pCxt->groupId);
|
||||||
|
pCxt->split = true;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static const SSplitRule splitRuleSet[] = {
|
static const SSplitRule splitRuleSet[] = {
|
||||||
{ .pName = "SuperTableScan", .splitFunc = stsSplit },
|
{ .pName = "SuperTableScan", .splitFunc = stsSplit },
|
||||||
{ .pName = "ChildTableJoin", .splitFunc = ctjSplit },
|
{ .pName = "ChildTableJoin", .splitFunc = ctjSplit },
|
||||||
{ .pName = "UnionAll", .splitFunc = uaSplit },
|
{ .pName = "UnionAll", .splitFunc = uaSplit },
|
||||||
|
{ .pName = "Union", .splitFunc = unSplit }
|
||||||
};
|
};
|
||||||
|
|
||||||
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
|
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
|
||||||
|
|
|
@ -27,3 +27,9 @@ TEST_F(PlanSetOpTest, unionAll) {
|
||||||
|
|
||||||
run("select c1, c2 from t1 where c1 > 10 union all select c1, c2 from t1 where c1 > 20");
|
run("select c1, c2 from t1 where c1 > 10 union all select c1, c2 from t1 where c1 > 20");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(PlanSetOpTest, union) {
|
||||||
|
useDb("root", "test");
|
||||||
|
|
||||||
|
run("select c1, c2 from t1 where c1 > 10 union select c1, c2 from t1 where c1 > 20");
|
||||||
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ public:
|
||||||
doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
|
doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
|
||||||
|
|
||||||
SQueryPlan* pPlan = nullptr;
|
SQueryPlan* pPlan = nullptr;
|
||||||
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan, NULL);
|
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
|
||||||
|
|
||||||
if (g_isDump) {
|
if (g_isDump) {
|
||||||
dump();
|
dump();
|
||||||
|
@ -162,7 +162,8 @@ private:
|
||||||
res_.scaledLogicPlan_ = toString((SNode*)(*pLogicPlan));
|
res_.scaledLogicPlan_ = toString((SNode*)(*pLogicPlan));
|
||||||
}
|
}
|
||||||
|
|
||||||
void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
|
void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) {
|
||||||
|
SArray* pExecNodeList = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr));
|
||||||
DO_WITH_THROW(createPhysiPlan, pCxt, pLogicPlan, pPlan, pExecNodeList);
|
DO_WITH_THROW(createPhysiPlan, pCxt, pLogicPlan, pPlan, pExecNodeList);
|
||||||
res_.physiPlan_ = toString((SNode*)(*pPlan));
|
res_.physiPlan_ = toString((SNode*)(*pPlan));
|
||||||
SNode* pNode;
|
SNode* pNode;
|
||||||
|
|
Loading…
Reference in New Issue