enh: optimize inner join cond

This commit is contained in:
dapan1121 2024-04-29 15:44:12 +08:00
parent 0997355460
commit a35dd72147
1 changed files with 110 additions and 14 deletions

View File

@ -27,6 +27,7 @@
#define OPTIMIZE_FLAG_PUSH_DOWN_CONDE OPTIMIZE_FLAG_MASK(1)
#define OPTIMIZE_FLAG_STB_JOIN OPTIMIZE_FLAG_MASK(2)
#define OPTIMIZE_FLAG_ELIMINATE_PROJ OPTIMIZE_FLAG_MASK(3)
#define OPTIMIZE_FLAG_JOIN_COND OPTIMIZE_FLAG_MASK(4)
#define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define OPTIMIZE_FLAG_CLEAR_MASK(val, mask) (val) &= (~(mask))
@ -2457,27 +2458,86 @@ static int32_t sortForJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
return sortForJoinOptimizeImpl(pCxt, pLogicSubplan, pJoin);
}
static SScanLogicNode* joinCondGetScanNode(SLogicNode* pNode) {
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
return (SScanLogicNode*)pNode;
case QUERY_NODE_LOGIC_PLAN_JOIN: {
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
if (JOIN_TYPE_INNER != pJoin->joinType) {
return NULL;
}
return joinCondGetScanNode((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0));
}
default:
return NULL;
}
}
static int32_t joinCondGetAllScanNodes(SLogicNode* pNode, SNodeList** pList) {
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
return nodesListMakeStrictAppend(pList, (SNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_JOIN: {
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
if (JOIN_TYPE_INNER != pJoin->joinType) {
return TSDB_CODE_SUCCESS;
}
int32_t code = joinCondGetAllScanNodes((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), pList);
if (TSDB_CODE_SUCCESS == code) {
code = joinCondGetAllScanNodes((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), pList);
}
return code;
}
default:
return TSDB_CODE_SUCCESS;
}
}
static bool joinCondMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode) || OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_JOIN_COND)) {
return false;
}
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
if (pNode->pChildren->length != 2 || JOIN_STYPE_ASOF == pJoin->subType || JOIN_STYPE_WIN == pJoin->subType || JOIN_TYPE_FULL == pJoin->joinType) {
OPTIMIZE_FLAG_SET_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_JOIN_COND);
return false;
}
SLogicNode* pLeft = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0);
SLogicNode* pRight = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1);
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pLeft) || QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pRight)) {
if ((JOIN_TYPE_LEFT == pJoin->joinType || JOIN_TYPE_RIGHT == pJoin->joinType) && (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pLeft) || QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pRight))) {
OPTIMIZE_FLAG_SET_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_JOIN_COND);
return false;
}
SScanLogicNode* pLScan = (SScanLogicNode*)pLeft;
SScanLogicNode* pRScan = (SScanLogicNode*)pRight;
if (JOIN_TYPE_INNER == pJoin->joinType && ((QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pLeft) && QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pLeft)) || (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pRight) && QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pRight)))) {
OPTIMIZE_FLAG_SET_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_JOIN_COND);
return false;
}
if (JOIN_TYPE_INNER == pJoin->joinType) {
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pLeft) && !OPTIMIZE_FLAG_TEST_MASK(pLeft->optimizedFlag, OPTIMIZE_FLAG_JOIN_COND)) {
return false;
}
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pRight) && !OPTIMIZE_FLAG_TEST_MASK(pRight->optimizedFlag, OPTIMIZE_FLAG_JOIN_COND)) {
return false;
}
}
SScanLogicNode* pLScan = joinCondGetScanNode(pLeft);
SScanLogicNode* pRScan = joinCondGetScanNode(pRight);
if (NULL == pLScan || NULL == pRScan) {
OPTIMIZE_FLAG_SET_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_JOIN_COND);
return false;
}
if (!IS_TSWINDOW_SPECIFIED(pLScan->scanRange) && !IS_TSWINDOW_SPECIFIED(pRScan->scanRange)) {
OPTIMIZE_FLAG_SET_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_JOIN_COND);
return false;
}
@ -2499,24 +2559,60 @@ static int32_t joinCondOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
return TSDB_CODE_SUCCESS;
}
SScanLogicNode* pLScan = (SScanLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0);
SScanLogicNode* pRScan = (SScanLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1);
switch (pJoin->joinType) {
case JOIN_TYPE_INNER:
joinCondMergeScanRand(&pLScan->scanRange, &pRScan->scanRange);
pRScan->scanRange.skey = pLScan->scanRange.skey;
pRScan->scanRange.ekey = pLScan->scanRange.ekey;
break;
case JOIN_TYPE_LEFT:
joinCondMergeScanRand(&pRScan->scanRange, &pLScan->scanRange);
break;
case JOIN_TYPE_RIGHT:
joinCondMergeScanRand(&pLScan->scanRange, &pRScan->scanRange);
break;
default:
case JOIN_TYPE_INNER: {
SNodeList* pScanList = NULL;
int32_t code = joinCondGetAllScanNodes((SLogicNode*)pJoin, &pScanList);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (NULL == pScanList || pScanList->length <= 0) {
nodesClearList(pScanList);
return TSDB_CODE_SUCCESS;
}
SNode* pNode = NULL;
STimeWindow scanRange = TSWINDOW_INITIALIZER;
FOREACH(pNode, pScanList) {
joinCondMergeScanRand(&scanRange, &((SScanLogicNode*)pNode)->scanRange);
}
FOREACH(pNode, pScanList) {
((SScanLogicNode*)pNode)->scanRange.skey = scanRange.skey;
((SScanLogicNode*)pNode)->scanRange.ekey = scanRange.ekey;
}
nodesClearList(pScanList);
break;
}
case JOIN_TYPE_LEFT: {
SLogicNode* pLeft = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0);
SLogicNode* pRight = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1);
SScanLogicNode* pLScan = joinCondGetScanNode(pLeft);
SScanLogicNode* pRScan = joinCondGetScanNode(pRight);
if (NULL == pLScan || NULL == pRScan) {
return TSDB_CODE_SUCCESS;
}
joinCondMergeScanRand(&pRScan->scanRange, &pLScan->scanRange);
break;
}
case JOIN_TYPE_RIGHT: {
SLogicNode* pLeft = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0);
SLogicNode* pRight = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1);
SScanLogicNode* pLScan = joinCondGetScanNode(pLeft);
SScanLogicNode* pRScan = joinCondGetScanNode(pRight);
if (NULL == pLScan || NULL == pRScan) {
return TSDB_CODE_SUCCESS;
}
joinCondMergeScanRand(&pLScan->scanRange, &pRScan->scanRange);
break;
}
default:
return TSDB_CODE_SUCCESS;
}
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_JOIN_COND);
pCxt->optimized = true;
return TSDB_CODE_SUCCESS;
}