Merge pull request #11717 from taosdata/feature/3.0_wxy
feat: sql command 'union all'
This commit is contained in:
commit
6d413c9de7
|
@ -48,6 +48,9 @@ extern "C" {
|
|||
(NULL == cell1 ? (node1 = NULL, false) : (node1 = cell1->pNode, true)), (NULL == cell2 ? (node2 = NULL, false) : (node2 = cell2->pNode, true)), (node1 != NULL && node2 != NULL); \
|
||||
cell1 = cell1->pNext, cell2 = cell2->pNext)
|
||||
|
||||
#define REPLACE_LIST1_NODE(newNode) cell1->pNode = (SNode*)(newNode)
|
||||
#define REPLACE_LIST2_NODE(newNode) cell2->pNode = (SNode*)(newNode)
|
||||
|
||||
#define FOREACH_FOR_REWRITE(node, list) \
|
||||
for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); (NULL != cell ? (node = &(cell->pNode), true) : (node = NULL, false)); cell = cell->pNext)
|
||||
|
||||
|
|
|
@ -155,7 +155,6 @@ typedef struct SLogicSubplan {
|
|||
|
||||
typedef struct SQueryLogicPlan {
|
||||
ENodeType type;
|
||||
int32_t totalLevel;
|
||||
SNodeList* pTopSubplans;
|
||||
} SQueryLogicPlan;
|
||||
|
||||
|
|
|
@ -229,10 +229,10 @@ typedef struct SFillNode {
|
|||
typedef struct SSelectStmt {
|
||||
ENodeType type; // QUERY_NODE_SELECT_STMT
|
||||
bool isDistinct;
|
||||
SNodeList* pProjectionList; // SNode
|
||||
SNodeList* pProjectionList;
|
||||
SNode* pFromTable;
|
||||
SNode* pWhere;
|
||||
SNodeList* pPartitionByList; // SNode
|
||||
SNodeList* pPartitionByList;
|
||||
SNode* pWindow;
|
||||
SNodeList* pGroupByList; // SGroupingSetNode
|
||||
SNode* pHaving;
|
||||
|
@ -245,12 +245,14 @@ typedef struct SSelectStmt {
|
|||
} SSelectStmt;
|
||||
|
||||
typedef enum ESetOperatorType {
|
||||
SET_OP_TYPE_UNION_ALL = 1
|
||||
SET_OP_TYPE_UNION_ALL = 1,
|
||||
SET_OP_TYPE_UNION
|
||||
} ESetOperatorType;
|
||||
|
||||
typedef struct SSetOperator {
|
||||
ENodeType type; // QUERY_NODE_SET_OPERATOR
|
||||
ESetOperatorType opType;
|
||||
SNodeList* pProjectionList;
|
||||
SNode* pLeft;
|
||||
SNode* pRight;
|
||||
SNodeList* pOrderByList; // SOrderByExprNode
|
||||
|
|
|
@ -614,6 +614,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG TAOS_DEF_ERROR_CODE(0, 0x2631)
|
||||
#define TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL TAOS_DEF_ERROR_CODE(0, 0x2632)
|
||||
#define TSDB_CODE_PAR_ONLY_ONE_JSON_TAG TAOS_DEF_ERROR_CODE(0, 0x2633)
|
||||
#define TSDB_CODE_PAR_INCORRECT_NUM_OF_COL TAOS_DEF_ERROR_CODE(0, 0x2634)
|
||||
|
||||
//planner
|
||||
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
|
||||
|
|
|
@ -837,6 +837,8 @@ query_expression(A) ::=
|
|||
query_expression_body(A) ::= query_primary(B). { A = B; }
|
||||
query_expression_body(A) ::=
|
||||
query_expression_body(B) UNION ALL query_expression_body(D). { A = createSetOperator(pCxt, SET_OP_TYPE_UNION_ALL, B, D); }
|
||||
query_expression_body(A) ::=
|
||||
query_expression_body(B) UNION query_expression_body(D). { A = createSetOperator(pCxt, SET_OP_TYPE_UNION, B, D); }
|
||||
|
||||
query_primary(A) ::= query_specification(B). { A = B; }
|
||||
//query_primary(A) ::=
|
||||
|
|
|
@ -230,6 +230,21 @@ static int32_t initTranslateContext(SParseContext* pParseCxt, STranslateContext*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t resetTranslateNamespace(STranslateContext* pCxt) {
|
||||
if (NULL != pCxt->pNsLevel) {
|
||||
size_t size = taosArrayGetSize(pCxt->pNsLevel);
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
taosArrayDestroy(taosArrayGetP(pCxt->pNsLevel, i));
|
||||
}
|
||||
taosArrayDestroy(pCxt->pNsLevel);
|
||||
}
|
||||
pCxt->pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
if (NULL == pCxt->pNsLevel) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void destroyTranslateContext(STranslateContext* pCxt) {
|
||||
if (NULL != pCxt->pNsLevel) {
|
||||
size_t size = taosArrayGetSize(pCxt->pNsLevel);
|
||||
|
@ -261,9 +276,11 @@ static bool belongTable(const char* currentDb, const SColumnNode* pCol, const ST
|
|||
return (0 == cmp);
|
||||
}
|
||||
|
||||
static SNodeList* getProjectList(SNode* pNode) {
|
||||
static SNodeList* getProjectList(const SNode* pNode) {
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pNode)) {
|
||||
return ((SSelectStmt*)pNode)->pProjectionList;
|
||||
} else if (QUERY_NODE_SET_OPERATOR == nodeType(pNode)) {
|
||||
return ((SSetOperator*)pNode)->pProjectionList;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1353,13 +1370,77 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static SNode* createSetOperProject(SNode* pNode) {
|
||||
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
return NULL;
|
||||
}
|
||||
pCol->node.resType = ((SExprNode*)pNode)->resType;
|
||||
strcpy(pCol->colName, ((SExprNode*)pNode)->aliasName);
|
||||
strcpy(pCol->node.aliasName, pCol->colName);
|
||||
return (SNode*)pCol;
|
||||
}
|
||||
|
||||
static bool dataTypeEqual(const SDataType* l, const SDataType* r) {
|
||||
return (l->type == r->type && l->bytes == l->bytes && l->precision == r->precision && l->scale == l->scale);
|
||||
}
|
||||
|
||||
static int32_t createCastFunc(STranslateContext* pCxt, SNode* pExpr, SDataType dt, SNode** pCast) {
|
||||
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(pFunc->functionName, "cast");
|
||||
pFunc->node.resType = dt;
|
||||
if (TSDB_CODE_SUCCESS != nodesListMakeAppend(&pFunc->pParameterList, pExpr)) {
|
||||
nodesDestroyNode(pFunc);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (DEAL_RES_ERROR == translateFunction(pCxt, pFunc)) {
|
||||
nodesClearList(pFunc->pParameterList);
|
||||
pFunc->pParameterList = NULL;
|
||||
nodesDestroyNode(pFunc);
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)pExpr)->aliasName);
|
||||
}
|
||||
*pCast = (SNode*)pFunc;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateSetOperatorImpl(STranslateContext* pCxt, SSetOperator* pSetOperator) {
|
||||
// todo
|
||||
SNodeList* pLeftProjections = getProjectList(pSetOperator->pLeft);
|
||||
SNodeList* pRightProjections = getProjectList(pSetOperator->pRight);
|
||||
if (LIST_LENGTH(pLeftProjections) != LIST_LENGTH(pRightProjections)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INCORRECT_NUM_OF_COL);
|
||||
}
|
||||
|
||||
SNode* pLeft = NULL;
|
||||
SNode* pRight = NULL;
|
||||
FORBOTH(pLeft, pLeftProjections, pRight, pRightProjections) {
|
||||
SExprNode* pLeftExpr = (SExprNode*)pLeft;
|
||||
SExprNode* pRightExpr = (SExprNode*)pRight;
|
||||
if (!dataTypeEqual(&pLeftExpr->resType, &pRightExpr->resType)) {
|
||||
SNode* pRightFunc = NULL;
|
||||
int32_t code = createCastFunc(pCxt, pRight, pLeftExpr->resType, &pRightFunc);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
REPLACE_LIST2_NODE(pRightFunc);
|
||||
pRightExpr = (SExprNode*)pRightFunc;
|
||||
}
|
||||
strcpy(pRightExpr->aliasName, pLeftExpr->aliasName);
|
||||
pRightExpr->aliasName[strlen(pLeftExpr->aliasName)] = '\0';
|
||||
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pSetOperator->pProjectionList, createSetOperProject(pLeft))) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateSetOperator(STranslateContext* pCxt, SSetOperator* pSetOperator) {
|
||||
int32_t code = translateQuery(pCxt, pSetOperator->pLeft);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = resetTranslateNamespace(pCxt);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateQuery(pCxt, pSetOperator->pRight);
|
||||
}
|
||||
|
@ -2794,8 +2875,8 @@ static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t extractSelectResultSchema(const SSelectStmt* pSelect, int32_t* numOfCols, SSchema** pSchema) {
|
||||
*numOfCols = LIST_LENGTH(pSelect->pProjectionList);
|
||||
static int32_t extractQueryResultSchema(const SNodeList* pProjections, int32_t* numOfCols, SSchema** pSchema) {
|
||||
*numOfCols = LIST_LENGTH(pProjections);
|
||||
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
|
||||
if (NULL == (*pSchema)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -2803,7 +2884,7 @@ static int32_t extractSelectResultSchema(const SSelectStmt* pSelect, int32_t* nu
|
|||
|
||||
SNode* pNode;
|
||||
int32_t index = 0;
|
||||
FOREACH(pNode, pSelect->pProjectionList) {
|
||||
FOREACH(pNode, pProjections) {
|
||||
SExprNode* pExpr = (SExprNode*)pNode;
|
||||
(*pSchema)[index].type = pExpr->resType.type;
|
||||
(*pSchema)[index].bytes = pExpr->resType.bytes;
|
||||
|
@ -2862,7 +2943,8 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
|
|||
|
||||
switch (nodeType(pRoot)) {
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
return extractSelectResultSchema((SSelectStmt*)pRoot, numOfCols, pSchema);
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
return extractQueryResultSchema(getProjectList(pRoot), numOfCols, pSchema);
|
||||
case QUERY_NODE_EXPLAIN_STMT:
|
||||
return extractExplainResultSchema(numOfCols, pSchema);
|
||||
case QUERY_NODE_DESCRIBE_STMT:
|
||||
|
@ -3485,6 +3567,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
switch (nodeType(pQuery->pRoot)) {
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
case QUERY_NODE_EXPLAIN_STMT:
|
||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||
pQuery->haveResultSet = true;
|
||||
|
|
|
@ -118,6 +118,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
return "sliding value can not less than 1% of interval value";
|
||||
case TSDB_CODE_PAR_ONLY_ONE_JSON_TAG:
|
||||
return "Only one tag if there is a json tag";
|
||||
case TSDB_CODE_PAR_INCORRECT_NUM_OF_COL:
|
||||
return "Query block has incorrect number of result columns";
|
||||
case TSDB_CODE_OUT_OF_MEMORY:
|
||||
return "Out of memory";
|
||||
default:
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -22,6 +22,7 @@ typedef struct SLogicPlanContext {
|
|||
} SLogicPlanContext;
|
||||
|
||||
typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, SSelectStmt*, SLogicNode**);
|
||||
typedef int32_t (*FCreateSetOpLogicNode)(SLogicPlanContext*, SSetOperator*, SLogicNode**);
|
||||
|
||||
static int32_t doCreateLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable, SLogicNode** pLogicNode);
|
||||
static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogicNode** pLogicNode);
|
||||
|
@ -343,7 +344,9 @@ static SColumnNode* createColumnByExpr(const char* pStmtName, SExprNode* pExpr)
|
|||
}
|
||||
pCol->node.resType = pExpr->resType;
|
||||
strcpy(pCol->colName, pExpr->aliasName);
|
||||
strcpy(pCol->tableAlias, pStmtName);
|
||||
if (NULL != pStmtName) {
|
||||
strcpy(pCol->tableAlias, pStmtName);
|
||||
}
|
||||
return pCol;
|
||||
}
|
||||
|
||||
|
@ -768,11 +771,126 @@ static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t createSetOpChildLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, FCreateSetOpLogicNode func, SLogicNode** pRoot) {
|
||||
SLogicNode* pNode = NULL;
|
||||
int32_t code = func(pCxt, pSetOperator, &pNode);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pNode) {
|
||||
code = pushLogicNode(pCxt, pRoot, pNode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode(pNode);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createSetOpSortLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
|
||||
if (NULL == pSetOperator->pOrderByList) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSortLogicNode* pSort = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT);
|
||||
if (NULL == pSort) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
pSort->node.pTargets = nodesCloneList(pSetOperator->pProjectionList);
|
||||
if (NULL == pSort->node.pTargets) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pSort->pSortKeys = nodesCloneList(pSetOperator->pOrderByList);
|
||||
if (NULL == pSort->pSortKeys) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pSort;
|
||||
} else {
|
||||
nodesDestroyNode(pSort);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
|
||||
SProjectLogicNode* pProject = (SProjectLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_PROJECT);
|
||||
if (NULL == pProject) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (NULL != pSetOperator->pLimit) {
|
||||
pProject->limit = ((SLimitNode*)pSetOperator->pLimit)->limit;
|
||||
pProject->offset = ((SLimitNode*)pSetOperator->pLimit)->offset;
|
||||
} else {
|
||||
pProject->limit = -1;
|
||||
pProject->offset = -1;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
pProject->pProjections = nodesCloneList(pSetOperator->pProjectionList);
|
||||
if (NULL == pProject->pProjections) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByProjections(pCxt, NULL, pSetOperator->pProjectionList, &pProject->node.pTargets);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pProject;
|
||||
} else {
|
||||
nodesDestroyNode(pProject);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
|
||||
SLogicNode* pSetOp = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (pSetOperator->opType) {
|
||||
case SET_OP_TYPE_UNION_ALL:
|
||||
code = createSetOpProjectLogicNode(pCxt, pSetOperator, &pSetOp);
|
||||
break;
|
||||
default:
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
SLogicNode* pLeft = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createQueryLogicNode(pCxt, pSetOperator->pLeft, &pLeft);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pSetOp->pChildren, (SNode*)pLeft);
|
||||
}
|
||||
SLogicNode* pRight = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createQueryLogicNode(pCxt, pSetOperator->pRight, &pRight);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListStrictAppend(pSetOp->pChildren, (SNode*)pRight);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pSetOp;
|
||||
} else {
|
||||
nodesDestroyNode(pSetOp);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createSetOperatorLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
|
||||
SLogicNode* pRoot = NULL;
|
||||
int32_t code = createQueryLogicNode(pCxt, pSetOperator->pLeft, &pRoot);
|
||||
int32_t code = createSetOpLogicNode(pCxt, pSetOperator, &pRoot);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createQueryLogicNode(pCxt, pSetOperator->pRight, &pRoot);
|
||||
code = createSetOpChildLogicNode(pCxt, pSetOperator, createSetOpSortLogicNode, &pRoot);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -38,7 +38,7 @@ typedef struct SPhysiPlanContext {
|
|||
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
if (NULL != pStmtName) {
|
||||
if (NULL != pStmtName && '\0' != pStmtName[0]) {
|
||||
return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName);
|
||||
}
|
||||
if ('\0' == pCol->tableAlias[0]) {
|
||||
|
@ -47,7 +47,7 @@ static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
|
|||
return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName);
|
||||
}
|
||||
|
||||
if (NULL != pStmtName) {
|
||||
if (NULL != pStmtName && '\0' != pStmtName[0]) {
|
||||
return sprintf(pKey, "%s.%s", pStmtName, ((SExprNode*)pNode)->aliasName);
|
||||
}
|
||||
return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
|
||||
|
|
|
@ -129,7 +129,7 @@ static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurren
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t* pLevel, SNodeList* pParentsGroup) {
|
||||
static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) {
|
||||
SNodeList* pCurrentGroup = nodesMakeList();
|
||||
if (NULL == pCurrentGroup) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -138,13 +138,13 @@ static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (pSubplan->subplanType) {
|
||||
case SUBPLAN_TYPE_MERGE:
|
||||
code = scaleOutForMerge(pCxt, pSubplan, *pLevel, pCurrentGroup);
|
||||
code = scaleOutForMerge(pCxt, pSubplan, level, pCurrentGroup);
|
||||
break;
|
||||
case SUBPLAN_TYPE_SCAN:
|
||||
code = scaleOutForScan(pCxt, pSubplan, *pLevel, pCurrentGroup);
|
||||
code = scaleOutForScan(pCxt, pSubplan, level, pCurrentGroup);
|
||||
break;
|
||||
case SUBPLAN_TYPE_MODIFY:
|
||||
code = scaleOutForModify(pCxt, pSubplan, *pLevel, pCurrentGroup);
|
||||
code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
@ -152,13 +152,12 @@ static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32
|
|||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = pushHierarchicalPlan(pParentsGroup, pCurrentGroup);
|
||||
++(*pLevel);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SNode* pChild;
|
||||
FOREACH(pChild, pSubplan->pChildren) {
|
||||
code = doScaleOut(pCxt, (SLogicSubplan*)pChild, pLevel, pCurrentGroup);
|
||||
code = doScaleOut(pCxt, (SLogicSubplan*)pChild, level + 1, pCurrentGroup);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
|
@ -194,7 +193,7 @@ int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQue
|
|||
}
|
||||
|
||||
SScaleOutContext cxt = { .pPlanCxt = pCxt, .subplanId = 1 };
|
||||
int32_t code = doScaleOut(&cxt, pLogicSubplan, &(pPlan->totalLevel), pPlan->pTopSubplans);
|
||||
int32_t code = doScaleOut(&cxt, pLogicSubplan, 0, pPlan->pTopSubplans);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicPlan = pPlan;
|
||||
} else {
|
||||
|
|
|
@ -45,7 +45,12 @@ typedef struct SCtjInfo {
|
|||
SLogicSubplan* pSubplan;
|
||||
} SCtjInfo;
|
||||
|
||||
typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, SStsInfo* pInfo);
|
||||
typedef struct SUaInfo {
|
||||
SProjectLogicNode* pProject;
|
||||
SLogicSubplan* pSubplan;
|
||||
} SUaInfo;
|
||||
|
||||
typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo);
|
||||
|
||||
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) {
|
||||
SLogicSubplan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
|
||||
|
@ -132,16 +137,10 @@ static bool stsFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) {
|
|||
|
||||
static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||
SStsInfo info = {0};
|
||||
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STS, stsFindSplitNode, &info)) {
|
||||
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STS, (FSplFindSplitNode)stsFindSplitNode, &info)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (NULL == info.pSubplan->pChildren) {
|
||||
info.pSubplan->pChildren = nodesMakeList();
|
||||
if (NULL == info.pSubplan->pChildren) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_STS));
|
||||
int32_t code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_STS));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = splCreateExchangeNode(pCxt, info.pSubplan, info.pScan, SUBPLAN_TYPE_MERGE);
|
||||
}
|
||||
|
@ -173,7 +172,7 @@ static SLogicNode* ctjMatchByNode(SLogicNode* pNode) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static bool ctjFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) {
|
||||
static bool ctjFindSplitNode(SLogicSubplan* pSubplan, SCtjInfo* pInfo) {
|
||||
SLogicNode* pSplitNode = ctjMatchByNode(pSubplan->pNode);
|
||||
if (NULL != pSplitNode) {
|
||||
pInfo->pScan = (SScanLogicNode*)pSplitNode;
|
||||
|
@ -184,16 +183,10 @@ static bool ctjFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) {
|
|||
|
||||
static int32_t ctjSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||
SCtjInfo info = {0};
|
||||
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_CTJ, ctjFindSplitNode, &info)) {
|
||||
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_CTJ, (FSplFindSplitNode)ctjFindSplitNode, &info)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (NULL == info.pSubplan->pChildren) {
|
||||
info.pSubplan->pChildren = nodesMakeList();
|
||||
if (NULL == info.pSubplan->pChildren) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_CTJ));
|
||||
int32_t code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_CTJ));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = splCreateExchangeNode(pCxt, info.pSubplan, info.pScan, info.pSubplan->subplanType);
|
||||
}
|
||||
|
@ -202,9 +195,106 @@ static int32_t ctjSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static SLogicNode* uaMatchByNode(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
|
||||
return pNode;
|
||||
}
|
||||
SNode* pChild;
|
||||
FOREACH(pChild, pNode->pChildren) {
|
||||
SLogicNode* pSplitNode = uaMatchByNode((SLogicNode*)pChild);
|
||||
if (NULL != pSplitNode) {
|
||||
return pSplitNode;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static bool uaFindSplitNode(SLogicSubplan* pSubplan, SUaInfo* pInfo) {
|
||||
SLogicNode* pSplitNode = uaMatchByNode(pSubplan->pNode);
|
||||
if (NULL != pSplitNode) {
|
||||
pInfo->pProject = (SProjectLogicNode*)pSplitNode;
|
||||
pInfo->pSubplan = pSubplan;
|
||||
}
|
||||
return NULL != pSplitNode;
|
||||
}
|
||||
|
||||
static SLogicSubplan* uaCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
|
||||
SLogicSubplan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
|
||||
if (NULL == pSubplan) {
|
||||
return NULL;
|
||||
}
|
||||
pSubplan->id.groupId = pCxt->groupId;
|
||||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||
pSubplan->pNode = pNode;
|
||||
// TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*);
|
||||
return pSubplan;
|
||||
}
|
||||
|
||||
static int32_t uaCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
|
||||
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
|
||||
if (NULL == pExchange) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pExchange->srcGroupId = pCxt->groupId;
|
||||
// pExchange->precision = pScan->pMeta->tableInfo.precision;
|
||||
pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
|
||||
if (NULL == pExchange->node.pTargets) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||
|
||||
return nodesListMakeAppend(&pProject->node.pChildren, (SNode*)pExchange);
|
||||
|
||||
// if (NULL == pProject->node.pParent) {
|
||||
// pSubplan->pNode = (SLogicNode*)pExchange;
|
||||
// nodesDestroyNode(pProject);
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
|
||||
// SNode* pNode;
|
||||
// FOREACH(pNode, pProject->node.pParent->pChildren) {
|
||||
// if (nodesEqualNode(pNode, pProject)) {
|
||||
// REPLACE_NODE(pExchange);
|
||||
// nodesDestroyNode(pNode);
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
// }
|
||||
// nodesDestroyNode(pExchange);
|
||||
// return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||
SUaInfo info = {0};
|
||||
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)uaFindSplitNode, &info)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, info.pProject->node.pChildren) {
|
||||
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, uaCreateSubplan(pCxt, (SLogicNode*)pChild));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
REPLACE_NODE(NULL);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
nodesClearList(info.pProject->node.pChildren);
|
||||
info.pProject->node.pChildren = NULL;
|
||||
code = uaCreateExchangeNode(pCxt, info.pSubplan, info.pProject);
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
pCxt->split = true;
|
||||
return code;
|
||||
}
|
||||
|
||||
static const SSplitRule splitRuleSet[] = {
|
||||
{ .pName = "SuperTableScan", .splitFunc = stsSplit },
|
||||
{ .pName = "ChildTableJoin", .splitFunc = ctjSplit },
|
||||
{ .pName = "UnionAll", .splitFunc = uaSplit },
|
||||
};
|
||||
|
||||
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "planTestUtil.h"
|
||||
#include "planner.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
class PlanSetOpTest : public PlannerTestBase {
|
||||
|
||||
};
|
||||
|
||||
TEST_F(PlanSetOpTest, unionAll) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("select c1, c2 from t1 where c1 > 10 union all select c1, c2 from t1 where c1 > 20");
|
||||
}
|
|
@ -63,6 +63,10 @@ public:
|
|||
|
||||
SQueryPlan* pPlan = nullptr;
|
||||
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan, NULL);
|
||||
|
||||
if (g_isDump) {
|
||||
dump();
|
||||
}
|
||||
} catch (...) {
|
||||
dump();
|
||||
throw;
|
||||
|
@ -87,6 +91,7 @@ private:
|
|||
string splitLogicPlan_;
|
||||
string scaledLogicPlan_;
|
||||
string physiPlan_;
|
||||
vector<string> physiSubplans_;
|
||||
};
|
||||
|
||||
void reset() {
|
||||
|
@ -115,6 +120,10 @@ private:
|
|||
cout << res_.scaledLogicPlan_ << endl;
|
||||
cout << "physical plan : " << endl;
|
||||
cout << res_.physiPlan_ << endl;
|
||||
cout << "physical subplan : " << endl;
|
||||
for (const auto& subplan : res_.physiSubplans_) {
|
||||
cout << subplan << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void doParseSql(const string& sql, SQuery** pQuery) {
|
||||
|
@ -156,6 +165,13 @@ private:
|
|||
void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
|
||||
DO_WITH_THROW(createPhysiPlan, pCxt, pLogicPlan, pPlan, pExecNodeList);
|
||||
res_.physiPlan_ = toString((SNode*)(*pPlan));
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, (*pPlan)->pSubplans) {
|
||||
SNode* pSubplan;
|
||||
FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) {
|
||||
res_.physiSubplans_.push_back(toString(pSubplan));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
|
||||
|
|
Loading…
Reference in New Issue