enh: optimize stable join

This commit is contained in:
dapan1121 2023-06-28 19:39:04 +08:00
parent 8f39b9d2e4
commit 81a2bf10cf
9 changed files with 385 additions and 67 deletions

View File

@ -233,6 +233,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC,
QUERY_NODE_LOGIC_PLAN_INTERP_FUNC,
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE,
QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL,
QUERY_NODE_LOGIC_SUBPLAN,
QUERY_NODE_LOGIC_PLAN,

View File

@ -42,6 +42,7 @@ typedef enum EGroupAction {
typedef struct SLogicNode {
ENodeType type;
bool dynamicOp;
SNodeList* pTargets; // SColumnNode
SNode* pConditions;
SNodeList* pChildren;
@ -114,6 +115,7 @@ typedef struct SJoinLogicNode {
SNode* pPrimKeyEqCond;
SNode* pColEqCond;
SNode* pTagEqCond;
SNode* pTagOnCond;
SNode* pOtherOnCond;
bool isSingleTableJoin;
bool hasSubQuery;
@ -157,9 +159,13 @@ typedef struct SInterpFuncLogicNode {
typedef struct SGroupCacheLogicNode {
SLogicNode node;
SNode* pGroupCol;
SNodeList* pGroupCols;
} SGroupCacheLogicNode;
typedef struct SDynQueryCtrlLogicNode {
SLogicNode node;
EDynQueryType qType;
} SDynQueryCtrlLogicNode;
typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType;

View File

@ -180,6 +180,10 @@ typedef enum EJoinAlgorithm {
JOIN_ALGO_HASH,
} EJoinAlgorithm;
typedef enum EDynQueryType {
DYN_QTYPE_STB_HASH = 1,
} EDynQueryType;
typedef struct SJoinTableNode {
STableNode table; // QUERY_NODE_JOIN_TABLE
EJoinType joinType;

View File

@ -403,11 +403,13 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(joinType);
COPY_SCALAR_FIELD(joinAlgo);
CLONE_NODE_FIELD(pPrimKeyEqCond);
CLONE_NODE_FIELD(pColEqCond);
CLONE_NODE_FIELD(pTagEqCond);
CLONE_NODE_FIELD(pOtherOnCond);
COPY_SCALAR_FIELD(isSingleTableJoin);
COPY_SCALAR_FIELD(hasSubQuery);
return TSDB_CODE_SUCCESS;
}

View File

@ -295,6 +295,10 @@ const char* nodesNodeName(ENodeType type) {
return "LogicIndefRowsFunc";
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
return "LogicInterpFunc";
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
return "LogicGroupCache";
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
return "LogicDynamicQueryCtrl";
case QUERY_NODE_LOGIC_SUBPLAN:
return "LogicSubplan";
case QUERY_NODE_LOGIC_PLAN:
@ -1172,6 +1176,55 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkGroupCacheLogicPlanGroupCols = "GroupCols";
static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) {
const SGroupCacheLogicNode* pNode = (const SGroupCacheLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkGroupCacheLogicPlanGroupCols, pNode->pGroupCols);
}
return code;
}
static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) {
SGroupCacheLogicNode* pNode = (SGroupCacheLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkGroupCacheLogicPlanGroupCols, &pNode->pGroupCols);
}
return code;
}
static const char* jkDynQueryCtrlLogicPlanQueryType = "QueryType";
static int32_t logicDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) {
const SDynQueryCtrlLogicNode* pNode = (const SDynQueryCtrlLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType);
}
return code;
}
static int32_t jsonToLogicDynQueryCtrlNode(const SJson* pJson, void* pObj) {
SDynQueryCtrlLogicNode* pNode = (SDynQueryCtrlLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType, code);
}
return code;
}
static const char* jkSubplanIdQueryId = "QueryId";
static const char* jkSubplanIdGroupId = "GroupId";
static const char* jkSubplanIdSubplanId = "SubplanId";
@ -1426,9 +1479,11 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) {
}
static const char* jkJoinLogicPlanJoinType = "JoinType";
static const char* jkJoinLogicPlanOnConditions = "OnConditions";
static const char* jkJoinLogicPlanJoinAlgo = "JoinAlgo";
static const char* jkJoinLogicPlanOnConditions = "OtherOnCond";
static const char* jkJoinLogicPlanPrimKeyEqCondition = "PrimKeyEqCond";
static const char* jkJoinLogicPlanColEqCondition = "ColumnEqCond";
static const char* jkJoinLogicPlanTagEqCondition = "TagEqCond";
static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj;
@ -1437,14 +1492,20 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinLogicPlanJoinType, pNode->joinType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinLogicPlanJoinAlgo, pNode->joinAlgo);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinLogicPlanPrimKeyEqCondition, nodeToJson, pNode->pPrimKeyEqCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOtherOnCond);
code = tjsonAddObject(pJson, jkJoinLogicPlanColEqCondition, nodeToJson, pNode->pColEqCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinLogicPlanColEqCondition, nodeToJson, pNode->pColEqCond);
code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqCondition, nodeToJson, pNode->pTagEqCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOtherOnCond);
}
return code;
}
@ -1457,14 +1518,21 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) {
tjsonGetNumberValue(pJson, jkJoinLogicPlanJoinType, pNode->joinType, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinLogicPlanPrimKeyEqCondition, &pNode->pPrimKeyEqCond);
tjsonGetNumberValue(pJson, jkJoinLogicPlanJoinAlgo, pNode->joinAlgo, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOtherOnCond);
code = jsonToNodeObject(pJson, jkJoinLogicPlanPrimKeyEqCondition, &pNode->pPrimKeyEqCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinLogicPlanColEqCondition, &pNode->pColEqCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqCondition, &pNode->pTagEqCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOtherOnCond);
}
return code;
}
@ -6577,6 +6645,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return logicIndefRowsFuncNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
return logicInterpFuncNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
return logicGroupCacheNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
return logicDynQueryCtrlNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN:
@ -6895,6 +6967,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToLogicIndefRowsFuncNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
return jsonToLogicInterpFuncNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
return jsonToLogicGroupCacheNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
return jsonToLogicDynQueryCtrlNode(pJson, pObj);
case QUERY_NODE_LOGIC_SUBPLAN:
return jsonToLogicSubplan(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN:

View File

@ -492,6 +492,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SInterpFuncLogicNode));
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
return makeNode(type, sizeof(SGroupCacheLogicNode));
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
return makeNode(type, sizeof(SDynQueryCtrlLogicNode));
case QUERY_NODE_LOGIC_SUBPLAN:
return makeNode(type, sizeof(SLogicSubplan));
case QUERY_NODE_LOGIC_PLAN:
@ -1180,6 +1182,17 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pLogicNode->pTimeSeries);
break;
}
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: {
SGroupCacheLogicNode* pLogicNode = (SGroupCacheLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyList(pLogicNode->pGroupCols);
break;
}
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: {
SDynQueryCtrlLogicNode* pLogicNode = (SDynQueryCtrlLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode);
break;
}
case QUERY_NODE_LOGIC_SUBPLAN: {
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
nodesDestroyList(pSubplan->pChildren);

View File

@ -763,14 +763,18 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond,
return false;
}
SOperatorNode* pOper = (SOperatorNode*)pCond;
if (OP_TYPE_EQUAL != pOper->opType) {
return false;
}
if (QUERY_NODE_COLUMN != nodeType(pOper->pLeft) || QUERY_NODE_COLUMN != nodeType(pOper->pRight)) {
return false;
}
SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft);
SColumnNode* pRight = (SColumnNode*)(pOper->pRight);
*allTags = (COLUMN_TYPE_TAG == pLeft->colType) && (COLUMN_TYPE_TAG == pRight->colType);
if (OP_TYPE_EQUAL != pOper->opType) {
return false;
}
//TODO: add cast to operator and remove this restriction of optimization
if (pLeft->node.resType.type != pRight->node.resType.type || pLeft->node.resType.bytes != pRight->node.resType.bytes) {
return false;
@ -784,7 +788,6 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond,
isEqual = pushDownCondOptIsTableColumn(pOper->pRight, pLeftCols);
}
if (isEqual) {
*allTags = (COLUMN_TYPE_TAG == pLeft->colType) && (COLUMN_TYPE_TAG == pRight->colType);
}
return isEqual;
}
@ -795,30 +798,42 @@ static int32_t pushDownCondOptJoinExtractEqualOnLogicCond(SJoinLogicNode* pJoin)
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pColEqOnConds = NULL;
SNodeList* pTagEqOnConds = NULL;
SNodeList* pTagOnConds = NULL;
SNode* pCond = NULL;
bool allTags = false;
FOREACH(pCond, pLogicCond->pParameterList) {
allTags = false;
if (pushDownCondOptIsColEqualOnCond(pJoin, pCond, &allTags)) {
if (allTags) {
code = nodesListMakeAppend(&pTagEqOnConds, nodesCloneNode(pCond));
} else {
code = nodesListMakeAppend(&pColEqOnConds, nodesCloneNode(pCond));
}
} else if (allTags) {
code = nodesListMakeAppend(&pTagOnConds, nodesCloneNode(pCond));
}
if (code) {
break;
}
}
SNode* pTempTagEqCond = NULL;
SNode* pTempColEqCond = NULL;
SNode* pTempTagOnCond = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = nodesMergeConds(&pTempColEqCond, &pColEqOnConds);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesMergeConds(&pTempTagEqCond, &pTagEqOnConds);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesMergeConds(&pTempTagOnCond, &pTagOnConds);
}
if (TSDB_CODE_SUCCESS == code) {
pJoin->pColEqCond = pTempColEqCond;
pJoin->pTagEqCond = pTempTagEqCond;
pJoin->pTagOnCond = pTempTagOnCond;
return TSDB_CODE_SUCCESS;
} else {
nodesDestroyList(pColEqOnConds);
@ -846,15 +861,62 @@ static int32_t pushDownCondOptJoinExtractEqualOnCond(SOptimizeContext* pCxt, SJo
} else {
pJoin->pColEqCond = nodesCloneNode(pJoin->pOtherOnCond);
}
} else if (allTags) {
pJoin->pTagOnCond = nodesCloneNode(pJoin->pOtherOnCond);
}
return TSDB_CODE_SUCCESS;
}
static int32_t pushDownCondOptAppendFilterCol(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->pOtherOnCond) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pCondCols = nodesMakeList();
SNodeList* pTargets = NULL;
if (NULL == pCondCols) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
code = nodesCollectColumnsFromNode(pJoin->pOtherOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pCondCols, &pTargets);
}
nodesDestroyList(pCondCols);
if (TSDB_CODE_SUCCESS == code) {
SNode* pNode = NULL;
FOREACH(pNode, pTargets) {
SNode* pTmp = NULL;
bool found = false;
FOREACH(pTmp, pJoin->node.pTargets) {
if (nodesEqualNode(pTmp, pNode)) {
found = true;
break;
}
}
if (!found) {
nodesListStrictAppend(pJoin->node.pTargets, pNode);
}
}
}
nodesDestroyList(pTargets);
return code;
}
static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS;
}
if (pJoin->joinAlgo != JOIN_ALGO_UNKNOWN) {
return TSDB_CODE_SUCCESS;
}
if (NULL == pJoin->node.pConditions) {
int32_t code = pushDownCondOptJoinExtractCond(pCxt, pJoin);
@ -889,6 +951,10 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
code = pushDownCondOptJoinExtractEqualOnCond(pCxt, pJoin);
}
if (TSDB_CODE_SUCCESS == code) {
code = pushDownCondOptAppendFilterCol(pCxt, pJoin);
}
if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
pCxt->optimized = true;
@ -2971,13 +3037,23 @@ static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) {
}
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2 || pJoin->hasSubQuery || pJoin->joinAlgo != UNKNOWN_JOIN_ALGO) {
if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2 || pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN) {
return false;
}
return true;
}
int32_t stbJoinOptAddFuncToScanNode(char* funcName, SScanLogicNode* pScan) {
SFunctionNode* pUidFunc = createFunction(funcName, NULL);
snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p",
pUidFunc->functionName, pUidFunc);
nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pUidFunc);
return createColumnByRewriteExpr((SNode*)pUidFunc, &pScan->node.pTargets);
}
int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
SJoinLogicNode* pJoinNode = (SJoinLogicNode*)pJoin;
@ -2990,6 +3066,9 @@ int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) {
SNodeList* pTags = nodesMakeList();
int32_t code = nodesCollectColumnsFromNode(pJoinNode->pTagEqCond, NULL, COLLECT_COL_TYPE_TAG, &pTags);
if (TSDB_CODE_SUCCESS == code) {
code = nodesCollectColumnsFromNode(pJoinNode->pTagOnCond, NULL, COLLECT_COL_TYPE_TAG, &pTags);
}
if (TSDB_CODE_SUCCESS == code) {
SNode* pTarget = NULL;
SNode* pTag = NULL;
@ -3010,18 +3089,10 @@ int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) {
}
}
if (TSDB_CODE_SUCCESS == code) {
SFunctionNode* pUidFunc = createFunction("_tbuid", NULL);
snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p",
pUidFunc->functionName, pUidFunc);
nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pUidFunc);
code = createColumnByRewriteExpr(pUidFunc, &pScan->node.pTargets);
code = stbJoinOptAddFuncToScanNode("_tbuid", pScan);
}
if (TSDB_CODE_SUCCESS == code) {
SFunctionNode* pVgidFunc = createFunction("_vgid", NULL);
snprintf(pVgidFunc->node.aliasName, sizeof(pVgidFunc->node.aliasName), "%s.%p",
pVgidFunc->functionName, pVgidFunc);
nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pVgidFunc);
code = createColumnByRewriteExpr(pVgidFunc, &pScan->node.pTargets);
code = stbJoinOptAddFuncToScanNode("_vgid", pScan);
}
if (code) {
@ -3071,6 +3142,7 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh
pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
pJoin->pTagEqCond = nodesCloneNode(pOrigJoin->pTagEqCond);
pJoin->pTagOnCond = nodesCloneNode(pOrigJoin->pTagOnCond);
int32_t code = TSDB_CODE_SUCCESS;
pJoin->node.pChildren = pChildren;
@ -3090,6 +3162,7 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh
if (code) {
break;
}
pScan->node.pParent = (SLogicNode*)pJoin;
}
if (TSDB_CODE_SUCCESS == code) {
@ -3107,27 +3180,36 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
FOREACH(pNode, pList) {
code = stbJoinOptAddUidToScan(pJoin, pNode);
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
code = stbJoinOptAddFuncToScanNode("_tbuid", pScan);
if (code) {
break;
}
pScan->node.dynamicOp = true;
}
*ppList = pList;
if (TSDB_CODE_SUCCESS == code) {
*ppList = pList;
} else {
nodesDestroyList(pList);
*ppList = NULL;
}
return TSDB_CODE_SUCCESS;
return code;
}
static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pOrig, SNodeList* pChildren, SLogicNode** ppLogic) {
SJoinLogicNode* pOrigJoin = (SJoinLogicNode*)pOrig;
SGroupCacheLogicNode* pGrpCache = (SJoinLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_GROUP_CACHE);
static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode** ppLogic) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheLogicNode* pGrpCache = (SGroupCacheLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_GROUP_CACHE);
if (NULL == pGrpCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
pGrpCache->node.dynamicOp = true;
pGrpCache->node.pChildren = pChildren;
pGrpCache->node.pTargets = nodesMakeList();
if (NULL == pGrpCache->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -3136,7 +3218,24 @@ static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pOrig, SNodeList* pChi
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pChildren, 0);
code = nodesListStrictAppendList(pGrpCache->node.pTargets, nodesCloneList(pScan->node.pTargets));
}
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pChildren, 0);
SNode* pCol = NULL;
FOREACH(pCol, pScan->pScanPseudoCols) {
if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID || ((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) {
code = createColumnByRewriteExpr(pCol, &pGrpCache->pGroupCols);
if (code) {
break;
}
}
}
SNode* pNode = NULL;
FOREACH(pNode, pChildren) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
pScan->node.pParent = (SLogicNode*)pGrpCache;
}
if (TSDB_CODE_SUCCESS == code) {
*ppLogic = (SLogicNode*)pGrpCache;
} else {
@ -3146,20 +3245,95 @@ static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pOrig, SNodeList* pChi
return code;
}
static void stbJoinOptRemoveTagEqCond(SJoinLogicNode* pJoin) {
if (QUERY_NODE_OPERATOR == nodeType(pJoin->pOtherOnCond) && nodesEqualNode(pJoin->pOtherOnCond, pJoin->pTagEqCond)) {
NODES_DESTORY_NODE(pJoin->pOtherOnCond);
return;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOtherOnCond)) {
SLogicConditionNode* pLogic = (SLogicConditionNode*)pJoin->pOtherOnCond;
SNode* pNode = NULL;
FOREACH(pNode, pLogic->pParameterList) {
if (nodesEqualNode(pNode, pJoin->pTagEqCond)) {
ERASE_NODE(pLogic->pParameterList);
break;
} else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pTagEqCond)) {
SLogicConditionNode* pTags = (SLogicConditionNode*)pJoin->pTagEqCond;
SNode* pTag = NULL;
FOREACH(pTag, pTags->pParameterList) {
if (nodesEqualNode(pTag, pNode)) {
ERASE_NODE(pLogic->pParameterList);
break;
}
}
}
}
static int32_t stbJoinOptCreateDynTaskCtrlNode(SLogicNode* pJoin, SLogicNode* pHJoinNode, SLogicNode* pMJoinNode, SLogicNode** ppDynNode) {
if (pLogic->pParameterList->length <= 0) {
NODES_DESTORY_NODE(pJoin->pOtherOnCond);
}
}
}
static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChild, SLogicNode** ppLogic) {
SJoinLogicNode* pOrigJoin = (SJoinLogicNode*)pOrig;
SJoinLogicNode* pJoin = (SJoinLogicNode*)nodesCloneNode((SNode*)pOrig);
if (NULL == pJoin) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pJoin->joinAlgo = JOIN_ALGO_MERGE;
pJoin->node.dynamicOp = true;
stbJoinOptRemoveTagEqCond(pJoin);
NODES_DESTORY_NODE(pJoin->pTagEqCond);
SNode* pNode = NULL;
FOREACH(pNode, pJoin->node.pChildren) {
ERASE_NODE(pJoin->node.pChildren);
}
nodesListStrictAppend(pJoin->node.pChildren, (SNode *)pChild);
pChild->pParent = (SLogicNode*)pJoin;
*ppLogic = (SLogicNode*)pJoin;
return TSDB_CODE_SUCCESS;
}
static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* pPost, SLogicNode** ppDynNode) {
int32_t code = TSDB_CODE_SUCCESS;
SDynQueryCtrlLogicNode* pDynCtrl = (SDynQueryCtrlLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL);
if (NULL == pDynCtrl) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pDynCtrl->qType = DYN_QTYPE_STB_HASH;
if (TSDB_CODE_SUCCESS == code) {
pDynNode->pChildren = nodesMakeList();
if (NULL == pDynNode->pChildren) {
pDynCtrl->node.pChildren = nodesMakeList();
if (NULL == pDynCtrl->node.pChildren) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pDynNode->pChildren, (SNode*)pHJoinNode);
nodesListStrictAppend(pDynCtrl->node.pChildren, (SNode*)pPrev);
nodesListStrictAppend(pDynCtrl->node.pChildren, (SNode*)pPost);
pDynCtrl->node.pTargets = nodesCloneList(pPost->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
pPrev->pParent = (SLogicNode*)pDynCtrl;
pPost->pParent = (SLogicNode*)pDynCtrl;
*ppDynNode = (SLogicNode*)pDynCtrl;
} else {
nodesDestroyNode((SNode*)pDynCtrl);
*ppDynNode = NULL;
}
return code;
}
static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* pJoin, SLogicSubplan* pLogicSubplan) {
@ -3174,16 +3348,16 @@ static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* p
code = stbJoinOptCreateTagHashJoinNode(pJoin, pTagScanNodes, &pHJoinNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateTableScanNodes(pJoin, pTbScanNodes);
code = stbJoinOptCreateTableScanNodes(pJoin, &pTbScanNodes);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateGroupCacheNode(pJoin, pTbScanNodes, &pGrpCacheNode);
code = stbJoinOptCreateGroupCacheNode(pTbScanNodes, &pGrpCacheNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateMergeJoinNode(pJoin, pGrpCacheNode, &pMJoinNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateDynTaskCtrlNode(pJoin, pHJoinNode, pMJoinNode, &pDynNode);
code = stbJoinOptCreateDynQueryCtrlNode(pHJoinNode, pMJoinNode, &pDynNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pLogicSubplan, pJoin, (SLogicNode*)pDynNode);
@ -3215,14 +3389,14 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "StableJoin", .optimizeFunc = stableJoinOptimize},
{.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize},
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
{.pName = "TagScan", .optimizeFunc = tagScanOptimize},
{.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize},
{.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize},
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
};
// clang-format on
@ -3257,6 +3431,7 @@ static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubpla
if (cxt.optimized) {
optimized = true;
dumpLogicSubplan(optimizeRuleSet[i].pName, pLogicSubplan);
break;
}
}
} while (optimized);

View File

@ -708,7 +708,8 @@ static int32_t mergeEqCond(SNode** ppDst, SNode** ppSrc) {
return TSDB_CODE_SUCCESS;
}
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
SPhysiNode** pPhyNode) {
SSortMergeJoinPhysiNode* pJoin =
(SSortMergeJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
@ -730,32 +731,6 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
&pJoin->pTargets);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) {
SNodeList* pCondCols = nodesMakeList();
SNodeList* pTargets = NULL;
SNodeList* pFinTargets = NULL;
if (NULL == pCondCols) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
code = nodesCollectColumnsFromNode(pJoinLogicNode->pOtherOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pCondCols, &pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pTargets, &pFinTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppendList(pJoin->pTargets, pFinTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pCondCols, pJoin->node.pOutputDataBlockDesc);
}
nodesDestroyList(pTargets);
nodesDestroyList(pCondCols);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) {
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
pJoinLogicNode->pOtherOnCond, &pJoin->pOtherOnCond);
@ -784,6 +759,72 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
return code;
}
static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
SPhysiNode** pPhyNode) {
SHashJoinPhysiNode* pJoin =
(SHashJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN);
if (NULL == pJoin) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
int32_t code = TSDB_CODE_SUCCESS;
pJoin->joinType = pJoinLogicNode->joinType;
pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
SNode* pPrimKeyCond = NULL;
SNode* pColEqCond = NULL;
SNode* pTagEqCond = NULL;
SNode* pTagOnCond = NULL;
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pPrimKeyCond);
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pColEqCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pTagEqCond);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pTagOnCond) {
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pTagOnCond, &pTagOnCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
}
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pJoin;
} else {
nodesDestroyNode((SNode*)pJoin);
}
return code;
}
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
SPhysiNode** pPhyNode) {
switch (pJoinLogicNode->joinAlgo) {
case JOIN_ALGO_MERGE:
return createMergeJoinPhysiNode(pCxt, pChildren, pJoinLogicNode, pPhyNode);
case JOIN_ALGO_HASH:
return createHashJoinPhysiNode(pCxt, pChildren, pJoinLogicNode, pPhyNode);
default:
planError("Invalid join algorithm:%d", pJoinLogicNode->joinAlgo);
break;
}
return TSDB_CODE_FAILED;
}
typedef struct SRewritePrecalcExprsCxt {
int32_t errCode;
int32_t planNodeId;

View File

@ -280,7 +280,7 @@ static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) {
}
static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) {
if (pJoin->isSingleTableJoin) {
if (pJoin->isSingleTableJoin || JOIN_ALGO_HASH == pJoin->joinAlgo) {
return false;
}
SNode* pChild = NULL;