feat(query): distributed splitting of child/normal table JOIN
This commit is contained in:
parent
d8762d401f
commit
13735d7257
|
@ -231,6 +231,7 @@ typedef enum EDealRes {
|
|||
DEAL_RES_CONTINUE = 1,
|
||||
DEAL_RES_IGNORE_CHILD,
|
||||
DEAL_RES_ERROR,
|
||||
DEAL_RES_END
|
||||
} EDealRes;
|
||||
|
||||
typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext);
|
||||
|
|
|
@ -548,7 +548,7 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
|
|||
char* dbName = nodesGetValueFromNode(node);
|
||||
strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
|
||||
*((char*)pContext + varDataLen(dbName)) = 0;
|
||||
return DEAL_RES_ERROR; // stop walk
|
||||
return DEAL_RES_END; // stop walk
|
||||
}
|
||||
default:
|
||||
break;
|
||||
|
|
|
@ -46,7 +46,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
|
|||
case QUERY_NODE_OPERATOR: {
|
||||
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
|
||||
res = walkNode(pOpNode->pLeft, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkNode(pOpNode->pRight, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
|
@ -63,10 +63,10 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
|
|||
case QUERY_NODE_JOIN_TABLE: {
|
||||
SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode;
|
||||
res = walkNode(pJoinTableNode->pLeft, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkNode(pJoinTableNode->pRight, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkNode(pJoinTableNode->pOnCond, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
|
@ -80,7 +80,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
|
|||
case QUERY_NODE_STATE_WINDOW: {
|
||||
SStateWindowNode* pState = (SStateWindowNode*)pNode;
|
||||
res = walkNode(pState->pExpr, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkNode(pState->pCol, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
|
@ -88,7 +88,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
|
|||
case QUERY_NODE_SESSION_WINDOW: {
|
||||
SSessionWindowNode* pSession = (SSessionWindowNode*)pNode;
|
||||
res = walkNode(pSession->pCol, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkNode(pSession->pGap, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
|
@ -96,16 +96,16 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
|
|||
case QUERY_NODE_INTERVAL_WINDOW: {
|
||||
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode;
|
||||
res = walkNode(pInterval->pInterval, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkNode(pInterval->pOffset, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkNode(pInterval->pSliding, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkNode(pInterval->pFill, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkNode(pInterval->pCol, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
|
@ -126,7 +126,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
|
|||
break;
|
||||
}
|
||||
|
||||
if (DEAL_RES_ERROR != res && TRAVERSAL_POSTORDER == order) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res && TRAVERSAL_POSTORDER == order) {
|
||||
res = walker(pNode, pContext);
|
||||
}
|
||||
|
||||
|
@ -136,8 +136,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
|
|||
static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext) {
|
||||
SNode* node;
|
||||
FOREACH(node, pNodeList) {
|
||||
if (DEAL_RES_ERROR == walkNode(node, order, walker, pContext)) {
|
||||
return DEAL_RES_ERROR;
|
||||
EDealRes res = walkNode(node, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR == res || DEAL_RES_END == res) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
|
@ -185,7 +186,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
|
|||
case QUERY_NODE_OPERATOR: {
|
||||
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
|
||||
res = rewriteNode(&(pOpNode->pLeft), order, rewriter, pContext);
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteNode(&(pOpNode->pRight), order, rewriter, pContext);
|
||||
}
|
||||
break;
|
||||
|
@ -202,10 +203,10 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
|
|||
case QUERY_NODE_JOIN_TABLE: {
|
||||
SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode;
|
||||
res = rewriteNode(&(pJoinTableNode->pLeft), order, rewriter, pContext);
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteNode(&(pJoinTableNode->pRight), order, rewriter, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteNode(&(pJoinTableNode->pOnCond), order, rewriter, pContext);
|
||||
}
|
||||
break;
|
||||
|
@ -219,7 +220,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
|
|||
case QUERY_NODE_STATE_WINDOW: {
|
||||
SStateWindowNode* pState = (SStateWindowNode*)pNode;
|
||||
res = rewriteNode(&pState->pExpr, order, rewriter, pContext);
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteNode(&pState->pCol, order, rewriter, pContext);
|
||||
}
|
||||
break;
|
||||
|
@ -227,7 +228,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
|
|||
case QUERY_NODE_SESSION_WINDOW: {
|
||||
SSessionWindowNode* pSession = (SSessionWindowNode*)pNode;
|
||||
res = rewriteNode(&pSession->pCol, order, rewriter, pContext);
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteNode(&pSession->pGap, order, rewriter, pContext);
|
||||
}
|
||||
break;
|
||||
|
@ -235,16 +236,16 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
|
|||
case QUERY_NODE_INTERVAL_WINDOW: {
|
||||
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode;
|
||||
res = rewriteNode(&(pInterval->pInterval), order, rewriter, pContext);
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteNode(&(pInterval->pOffset), order, rewriter, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteNode(&(pInterval->pSliding), order, rewriter, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteNode(&(pInterval->pFill), order, rewriter, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = rewriteNode(&(pInterval->pCol), order, rewriter, pContext);
|
||||
}
|
||||
break;
|
||||
|
@ -265,7 +266,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
|
|||
break;
|
||||
}
|
||||
|
||||
if (DEAL_RES_ERROR != res && TRAVERSAL_POSTORDER == order) {
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res && TRAVERSAL_POSTORDER == order) {
|
||||
res = rewriter(pRawNode, pContext);
|
||||
}
|
||||
|
||||
|
@ -275,8 +276,9 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
|
|||
static EDealRes rewriteList(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) {
|
||||
SNode** pNode;
|
||||
FOREACH_FOR_REWRITE(pNode, pNodeList) {
|
||||
if (DEAL_RES_ERROR == rewriteNode(pNode, order, rewriter, pContext)) {
|
||||
return DEAL_RES_ERROR;
|
||||
EDealRes res = rewriteNode(pNode, order, rewriter, pContext);
|
||||
if (DEAL_RES_ERROR == res || DEAL_RES_END == res) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
|
|
|
@ -328,8 +328,26 @@ SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType typ
|
|||
CHECK_OUT_OF_MEM(cond);
|
||||
cond->condType = type;
|
||||
cond->pParameterList = nodesMakeList();
|
||||
nodesListAppend(cond->pParameterList, pParam1);
|
||||
nodesListAppend(cond->pParameterList, pParam2);
|
||||
if ((QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1) && type != ((SLogicConditionNode*)pParam1)->condType) ||
|
||||
(QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2) && type != ((SLogicConditionNode*)pParam2)->condType)) {
|
||||
nodesListAppend(cond->pParameterList, pParam1);
|
||||
nodesListAppend(cond->pParameterList, pParam2);
|
||||
} else {
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1)) {
|
||||
nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam1)->pParameterList);
|
||||
((SLogicConditionNode*)pParam1)->pParameterList = NULL;
|
||||
nodesDestroyNode(pParam1);
|
||||
} else {
|
||||
nodesListAppend(cond->pParameterList, pParam1);
|
||||
}
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2)) {
|
||||
nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam2)->pParameterList);
|
||||
((SLogicConditionNode*)pParam2)->pParameterList = NULL;
|
||||
nodesDestroyNode(pParam2);
|
||||
} else {
|
||||
nodesListAppend(cond->pParameterList, pParam2);
|
||||
}
|
||||
}
|
||||
return (SNode*)cond;
|
||||
}
|
||||
|
||||
|
|
|
@ -146,6 +146,7 @@ public:
|
|||
meta_[db][tbname].reset(new MockTableMeta());
|
||||
meta_[db][tbname]->schema = table.release();
|
||||
meta_[db][tbname]->schema->uid = id_++;
|
||||
meta_[db][tbname]->schema->tableType = TSDB_CHILD_TABLE;
|
||||
|
||||
SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0,};
|
||||
addEpIntoEpSet(&vgroup.epSet, "dnode_1", 6030);
|
||||
|
@ -197,11 +198,11 @@ public:
|
|||
std::cout << "Table:" << table.first << std::endl;
|
||||
std::cout << SH("Field") << SH("Type") << SH("DataType") << IH("Bytes") << std::endl;
|
||||
std::cout << SL(3, 1) << std::endl;
|
||||
int16_t numOfTags = schema->tableInfo.numOfTags;
|
||||
int16_t numOfFields = numOfTags + schema->tableInfo.numOfColumns;
|
||||
int16_t numOfColumns = schema->tableInfo.numOfColumns;
|
||||
int16_t numOfFields = numOfColumns + schema->tableInfo.numOfTags;
|
||||
for (int16_t i = 0; i < numOfFields; ++i) {
|
||||
const SSchema* col = schema->schema + i;
|
||||
std::cout << SF(std::string(col->name)) << SH(ftToString(i, numOfTags)) << SH(dtToString(col->type)) << IF(col->bytes) << std::endl;
|
||||
std::cout << SF(std::string(col->name)) << SH(ftToString(i, numOfColumns)) << SH(dtToString(col->type)) << IF(col->bytes) << std::endl;
|
||||
}
|
||||
std::cout << std::endl;
|
||||
}
|
||||
|
@ -262,8 +263,8 @@ private:
|
|||
return tDataTypes[type].name;
|
||||
}
|
||||
|
||||
std::string ftToString(int16_t colid, int16_t numOfTags) const {
|
||||
return (0 == colid ? "column" : (colid <= numOfTags ? "tag" : "column"));
|
||||
std::string ftToString(int16_t colid, int16_t numOfColumns) const {
|
||||
return (0 == colid ? "column" : (colid <= numOfColumns ? "tag" : "column"));
|
||||
}
|
||||
|
||||
STableMeta* getTableSchemaMeta(const std::string& db, const std::string& tbname) const {
|
||||
|
|
|
@ -694,7 +694,6 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
|
|||
}
|
||||
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
||||
|
|
|
@ -41,6 +41,21 @@ typedef struct SOsdInfo {
|
|||
SNodeList* pDsoFuncs;
|
||||
} SOsdInfo;
|
||||
|
||||
typedef struct SCpdIsMultiTableCondCxt {
|
||||
SNodeList* pLeftCols;
|
||||
SNodeList* pRightCols;
|
||||
bool havaLeftCol;
|
||||
bool haveRightCol;
|
||||
} SCpdIsMultiTableCondCxt;
|
||||
|
||||
typedef enum ECondAction {
|
||||
COND_ACTION_STAY = 1,
|
||||
COND_ACTION_PUSH_JOIN,
|
||||
COND_ACTION_PUSH_LEFT_CHILD,
|
||||
COND_ACTION_PUSH_RIGHT_CHILD
|
||||
// after supporting outer join, there are other possibilities
|
||||
} ECondAction;
|
||||
|
||||
static bool osdMayBeOptimized(SLogicNode* pNode) {
|
||||
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OSD)) {
|
||||
return false;
|
||||
|
@ -152,34 +167,227 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t cpdPartitionCondition(SJoinLogicNode* pJoin, SNodeList** pMultiTableCond, SNodeList** pSingleTableCond) {
|
||||
// todo
|
||||
static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
|
||||
SNode* pTableCol = NULL;
|
||||
FOREACH(pTableCol, pTableCols) {
|
||||
if (nodesEqualNode(pCondCol, pTableCol)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static EDealRes cpdIsMultiTableCondImpl(SNode* pNode, void* pContext) {
|
||||
SCpdIsMultiTableCondCxt* pCxt = pContext;
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
if (belongThisTable(pNode, pCxt->pLeftCols)) {
|
||||
pCxt->havaLeftCol = true;
|
||||
} else if (belongThisTable(pNode, pCxt->pRightCols)) {
|
||||
pCxt->haveRightCol = true;
|
||||
}
|
||||
return pCxt->havaLeftCol && pCxt->haveRightCol ? DEAL_RES_END : DEAL_RES_CONTINUE;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static ECondAction cpdCondAction(EJoinType joinType, SNodeList* pLeftCols, SNodeList* pRightCols, SNode* pNode) {
|
||||
SCpdIsMultiTableCondCxt cxt = { .pLeftCols = pLeftCols, .pRightCols = pRightCols, .havaLeftCol = false, .haveRightCol = false };
|
||||
nodesWalkExpr(pNode, cpdIsMultiTableCondImpl, &cxt);
|
||||
return (JOIN_TYPE_INNER != joinType ? COND_ACTION_STAY :
|
||||
(cxt.havaLeftCol && cxt.haveRightCol ? COND_ACTION_PUSH_JOIN : (cxt.havaLeftCol ? COND_ACTION_PUSH_LEFT_CHILD : COND_ACTION_PUSH_RIGHT_CHILD)));
|
||||
}
|
||||
|
||||
static int32_t cpdMakeCond(SNodeList** pConds, SNode** pCond) {
|
||||
if (NULL == *pConds) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (1 == LIST_LENGTH(*pConds)) {
|
||||
*pCond = nodesListGetNode(*pConds, 0);
|
||||
nodesClearList(*pConds);
|
||||
} else {
|
||||
SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
|
||||
if (NULL == pLogicCond) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pLogicCond->condType = LOGIC_COND_TYPE_AND;
|
||||
pLogicCond->pParameterList = *pConds;
|
||||
*pCond = (SNode*)pLogicCond;
|
||||
}
|
||||
*pConds = NULL;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t cpdPushJoinCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNodeList* pMultiTableCond) {
|
||||
// todo
|
||||
static int32_t cpdPartitionLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) {
|
||||
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pJoin->node.pConditions;
|
||||
if (LOGIC_COND_TYPE_AND != pLogicCond->condType) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
|
||||
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SNodeList* pOnConds = NULL;
|
||||
SNodeList* pLeftChildConds = NULL;
|
||||
SNodeList* pRightChildConds = NULL;
|
||||
SNodeList* pRemainConds = NULL;
|
||||
SNode* pCond = NULL;
|
||||
FOREACH(pCond, pLogicCond->pParameterList) {
|
||||
ECondAction condAction = cpdCondAction(pJoin->joinType, pLeftCols, pRightCols, pCond);
|
||||
if (COND_ACTION_PUSH_JOIN == condAction) {
|
||||
code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond));
|
||||
} else if (COND_ACTION_PUSH_LEFT_CHILD == condAction) {
|
||||
code = nodesListMakeAppend(&pLeftChildConds, nodesCloneNode(pCond));
|
||||
} else if (COND_ACTION_PUSH_RIGHT_CHILD == condAction) {
|
||||
code = nodesListMakeAppend(&pRightChildConds, nodesCloneNode(pCond));
|
||||
} else {
|
||||
code = nodesListMakeAppend(&pRemainConds, nodesCloneNode(pCond));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SNode* pTempOnCond = NULL;
|
||||
SNode* pTempLeftChildCond = NULL;
|
||||
SNode* pTempRightChildCond = NULL;
|
||||
SNode* pTempRemainCond = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = cpdMakeCond(&pOnConds, &pTempOnCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = cpdMakeCond(&pLeftChildConds, &pTempLeftChildCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = cpdMakeCond(&pRightChildConds, &pTempRightChildCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = cpdMakeCond(&pRemainConds, &pTempRemainCond);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pOnCond = pTempOnCond;
|
||||
*pLeftChildCond = pTempLeftChildCond;
|
||||
*pRightChildCond = pTempRightChildCond;
|
||||
nodesDestroyNode(pJoin->node.pConditions);
|
||||
pJoin->node.pConditions = pTempRemainCond;
|
||||
} else {
|
||||
nodesDestroyList(pOnConds);
|
||||
nodesDestroyList(pLeftChildConds);
|
||||
nodesDestroyList(pRightChildConds);
|
||||
nodesDestroyList(pRemainConds);
|
||||
nodesDestroyNode(pTempOnCond);
|
||||
nodesDestroyNode(pTempLeftChildCond);
|
||||
nodesDestroyNode(pTempRightChildCond);
|
||||
nodesDestroyNode(pTempRemainCond);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t cpdPartitionOpCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) {
|
||||
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
|
||||
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
|
||||
ECondAction condAction = cpdCondAction(pJoin->joinType, pLeftCols, pRightCols, pJoin->node.pConditions);
|
||||
if (COND_ACTION_STAY == condAction) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (COND_ACTION_PUSH_JOIN == condAction) {
|
||||
*pOnCond = pJoin->node.pConditions;
|
||||
} else if (COND_ACTION_PUSH_LEFT_CHILD == condAction) {
|
||||
*pLeftChildCond = pJoin->node.pConditions;
|
||||
} else if (COND_ACTION_PUSH_RIGHT_CHILD == condAction) {
|
||||
*pRightChildCond = pJoin->node.pConditions;
|
||||
}
|
||||
pJoin->node.pConditions = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t cpdPushJoinCondToChildren(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNodeList* pSingleTableCond) {
|
||||
// todo
|
||||
return TSDB_CODE_SUCCESS;
|
||||
static int32_t cpdPartitionCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) {
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->node.pConditions)) {
|
||||
return cpdPartitionLogicCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond);
|
||||
} else {
|
||||
return cpdPartitionOpCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t cpdCondAppend(SOptimizeContext* pCxt, SNode** pCond, SNode** pAdditionalCond) {
|
||||
if (NULL == *pCond) {
|
||||
TSWAP(*pCond, *pAdditionalCond, SNode*);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCond)) {
|
||||
code = nodesListAppend(((SLogicConditionNode*)*pCond)->pParameterList, *pAdditionalCond);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pAdditionalCond = NULL;
|
||||
}
|
||||
} else {
|
||||
SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
|
||||
if (NULL == pLogicCond) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pLogicCond->condType = LOGIC_COND_TYPE_AND;
|
||||
code = nodesListMakeAppend(&pLogicCond->pParameterList, *pAdditionalCond);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pAdditionalCond = NULL;
|
||||
code = nodesListMakeAppend(&pLogicCond->pParameterList, *pCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pCond = (SNode*)pLogicCond;
|
||||
} else {
|
||||
nodesDestroyNode(pLogicCond);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t cpdPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) {
|
||||
return cpdCondAppend(pCxt, &pJoin->pOnConditions, pCond);
|
||||
}
|
||||
|
||||
static int32_t cpdPushCondToScan(SOptimizeContext* pCxt, SScanLogicNode* pScan, SNode** pCond) {
|
||||
return cpdCondAppend(pCxt, &pScan->node.pConditions, pCond);
|
||||
}
|
||||
|
||||
static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) {
|
||||
switch (nodeType(pChild)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
return cpdPushCondToScan(pCxt, (SScanLogicNode*)pChild, pCond);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||
if (NULL != pJoin->node.pConditions) {
|
||||
SNodeList* pMultiTableCond = NULL;
|
||||
SNodeList* pSingleTableCond = NULL;
|
||||
int32_t code = cpdPartitionCondition(pJoin, &pMultiTableCond, &pSingleTableCond);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pMultiTableCond) {
|
||||
code = cpdPushJoinCondToOnCond(pCxt, pJoin, pMultiTableCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSingleTableCond) {
|
||||
code = cpdPushJoinCondToChildren(pCxt, pJoin, pSingleTableCond);
|
||||
}
|
||||
if (NULL == pJoin->node.pConditions) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
SNode* pOnCond = NULL;
|
||||
SNode* pLeftChildCond = NULL;
|
||||
SNode* pRightChildCond = NULL;
|
||||
int32_t code = cpdPartitionCond(pJoin, &pOnCond, &pLeftChildCond, &pRightChildCond);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) {
|
||||
code = cpdPushCondToOnCond(pCxt, pJoin, &pOnCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) {
|
||||
code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pRightChildCond) {
|
||||
code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode(pOnCond);
|
||||
nodesDestroyNode(pLeftChildCond);
|
||||
nodesDestroyNode(pRightChildCond);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t cpdPushAggCondition(SOptimizeContext* pCxt, SAggLogicNode* pAgg) {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#define SPLIT_FLAG_MASK(n) (1 << n)
|
||||
|
||||
#define SPLIT_FLAG_STS SPLIT_FLAG_MASK(0)
|
||||
#define SPLIT_FLAG_CTJ SPLIT_FLAG_MASK(1)
|
||||
|
||||
#define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask)
|
||||
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
|
@ -39,43 +40,14 @@ typedef struct SStsInfo {
|
|||
SLogicSubplan* pSubplan;
|
||||
} SStsInfo;
|
||||
|
||||
static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) &&
|
||||
NULL != ((SScanLogicNode*)pNode)->pVgroupList && ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > 1) {
|
||||
return pNode;
|
||||
}
|
||||
SNode* pChild;
|
||||
FOREACH(pChild, pNode->pChildren) {
|
||||
SLogicNode* pSplitNode = stsMatchByNode((SLogicNode*)pChild);
|
||||
if (NULL != pSplitNode) {
|
||||
return pSplitNode;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
typedef struct SCtjInfo {
|
||||
SScanLogicNode* pScan;
|
||||
SLogicSubplan* pSubplan;
|
||||
} SCtjInfo;
|
||||
|
||||
static void stsFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) {
|
||||
SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode);
|
||||
if (NULL != pSplitNode) {
|
||||
pInfo->pScan = (SScanLogicNode*)pSplitNode;
|
||||
pInfo->pSubplan = pSubplan;
|
||||
}
|
||||
}
|
||||
static void stsMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, SStsInfo* pInfo) {
|
||||
if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS)) {
|
||||
stsFindSplitNode(pSubplan, pInfo);
|
||||
}
|
||||
SNode* pChild;
|
||||
FOREACH(pChild, pSubplan->pChildren) {
|
||||
stsMatch(pCxt, (SLogicSubplan*)pChild, pInfo);
|
||||
if (NULL != pInfo->pScan) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, SStsInfo* pInfo);
|
||||
|
||||
static SLogicSubplan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan) {
|
||||
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) {
|
||||
SLogicSubplan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
|
||||
if (NULL == pSubplan) {
|
||||
return NULL;
|
||||
|
@ -84,11 +56,11 @@ static SLogicSubplan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode*
|
|||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||
pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan);
|
||||
TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*);
|
||||
SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS);
|
||||
SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
|
||||
return pSubplan;
|
||||
}
|
||||
|
||||
static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) {
|
||||
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan, ESubplanType subplanType) {
|
||||
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
|
||||
if (NULL == pExchange) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -119,10 +91,48 @@ static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
|
||||
if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
|
||||
if (func(pSubplan, pInfo)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
SNode* pChild;
|
||||
FOREACH(pChild, pSubplan->pChildren) {
|
||||
if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) &&
|
||||
NULL != ((SScanLogicNode*)pNode)->pVgroupList && ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > 1) {
|
||||
return pNode;
|
||||
}
|
||||
SNode* pChild;
|
||||
FOREACH(pChild, pNode->pChildren) {
|
||||
SLogicNode* pSplitNode = stsMatchByNode((SLogicNode*)pChild);
|
||||
if (NULL != pSplitNode) {
|
||||
return pSplitNode;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static bool stsFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) {
|
||||
SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode);
|
||||
if (NULL != pSplitNode) {
|
||||
pInfo->pScan = (SScanLogicNode*)pSplitNode;
|
||||
pInfo->pSubplan = pSubplan;
|
||||
}
|
||||
return NULL != pSplitNode;
|
||||
}
|
||||
|
||||
static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||
SStsInfo info = {0};
|
||||
stsMatch(pCxt, pSubplan, &info);
|
||||
if (NULL == info.pScan) {
|
||||
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STS, stsFindSplitNode, &info)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (NULL == info.pSubplan->pChildren) {
|
||||
|
@ -131,9 +141,61 @@ static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, stsCreateScanSubplan(pCxt, info.pScan));
|
||||
int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_STS));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stsCreateExchangeNode(pCxt, info.pSubplan, info.pScan);
|
||||
code = splCreateExchangeNode(pCxt, info.pSubplan, info.pScan, SUBPLAN_TYPE_MERGE);
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
pCxt->split = true;
|
||||
return code;
|
||||
}
|
||||
|
||||
static bool ctjIsSingleTable(int8_t tableType) {
|
||||
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
|
||||
}
|
||||
|
||||
static SLogicNode* ctjMatchByNode(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode)) {
|
||||
SLogicNode* pLeft = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||
SLogicNode* pRight = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1);
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pLeft) && ctjIsSingleTable(((SScanLogicNode*)pLeft)->pMeta->tableType) &&
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pRight) && ctjIsSingleTable(((SScanLogicNode*)pRight)->pMeta->tableType)) {
|
||||
return pRight;
|
||||
}
|
||||
}
|
||||
SNode* pChild;
|
||||
FOREACH(pChild, pNode->pChildren) {
|
||||
SLogicNode* pSplitNode = ctjMatchByNode((SLogicNode*)pChild);
|
||||
if (NULL != pSplitNode) {
|
||||
return pSplitNode;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static bool ctjFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) {
|
||||
SLogicNode* pSplitNode = ctjMatchByNode(pSubplan->pNode);
|
||||
if (NULL != pSplitNode) {
|
||||
pInfo->pScan = (SScanLogicNode*)pSplitNode;
|
||||
pInfo->pSubplan = pSubplan;
|
||||
}
|
||||
return NULL != pSplitNode;
|
||||
}
|
||||
|
||||
static int32_t ctjSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||
SCtjInfo info = {0};
|
||||
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_CTJ, ctjFindSplitNode, &info)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (NULL == info.pSubplan->pChildren) {
|
||||
info.pSubplan->pChildren = nodesMakeList();
|
||||
if (NULL == info.pSubplan->pChildren) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_CTJ));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = splCreateExchangeNode(pCxt, info.pSubplan, info.pScan, info.pSubplan->subplanType);
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
pCxt->split = true;
|
||||
|
@ -141,7 +203,8 @@ static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
}
|
||||
|
||||
static const SSplitRule splitRuleSet[] = {
|
||||
{ .pName = "SuperTableScan", .splitFunc = stsSplit }
|
||||
{ .pName = "SuperTableScan", .splitFunc = stsSplit },
|
||||
{ .pName = "ChildTableJoin", .splitFunc = ctjSplit },
|
||||
};
|
||||
|
||||
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
|
||||
|
|
|
@ -70,6 +70,12 @@ protected:
|
|||
cout << "unformatted logic plan : " << endl;
|
||||
cout << toString((const SNode*)pLogicNode, false) << endl;
|
||||
|
||||
code = optimizeLogicPlan(&cxt, pLogicNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
cout << "sql:[" << cxt_.pSql << "] optimizeLogicPlan code:" << code << ", strerror:" << tstrerror(code) << endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
SLogicSubplan* pLogicSubplan = nullptr;
|
||||
code = splitLogicPlan(&cxt, pLogicNode, &pLogicSubplan);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -174,13 +180,13 @@ TEST_F(PlannerTest, selectStableBasic) {
|
|||
TEST_F(PlannerTest, selectJoin) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("SELECT * FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
|
||||
ASSERT_TRUE(run());
|
||||
// bind("SELECT t1.c1, t2.c2 FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
|
||||
// ASSERT_TRUE(run());
|
||||
|
||||
bind("SELECT * FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1");
|
||||
ASSERT_TRUE(run());
|
||||
// bind("SELECT t1.*, t2.* FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
|
||||
// ASSERT_TRUE(run());
|
||||
|
||||
bind("SELECT t1.* FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1");
|
||||
bind("SELECT t1.c1, t2.c1 FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1 and t1.c2 = 'abc' and t2.c2 = 'qwe'");
|
||||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue