diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index ef2730086b..0244823e29 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -65,6 +65,7 @@ typedef struct SGcDownstreamCtx { SHashObj* pSessions; SHashObj* pWaitSessions; SGcFileCacheCtx fileCtx; + bool fetchDone; } SGcDownstreamCtx; typedef struct SGcVgroupCtx { diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index aee60c02cc..52d759d4aa 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -821,6 +821,7 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* taosArrayDestroy(pParam->pChildren); pParam->pChildren = NULL; + pCtx->fetchDone = false; } return TSDB_CODE_SUCCESS; @@ -910,6 +911,7 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes while (NULL != (pGroup = taosHashIterate(pGrpHash, pGroup))) { handleGroupFetchDone(pGroup); } + pCtx->fetchDone = true; } else { int32_t uidNum = 0; SGcVgroupCtx* pVgCtx = NULL; @@ -979,33 +981,41 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64 SGroupCacheOperatorInfo* pGCache = pOperator->info; *got = true; - if (pSession->pGroupData->needCache) { - SGcBlkList* pBlkList = &pSession->pGroupData->blkList; - taosRLockLatch(&pBlkList->lock); - int64_t blkNum = taosArrayGetSize(pBlkList->pList); - if (pSession->lastBlkId < 0) { - if (blkNum > 0) { - SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, 0); + if (NULL != pSession->pGroupData) { + if (pSession->pGroupData->needCache) { + SGcBlkList* pBlkList = &pSession->pGroupData->blkList; + taosRLockLatch(&pBlkList->lock); + int64_t blkNum = taosArrayGetSize(pBlkList->pList); + if (pSession->lastBlkId < 0) { + if (blkNum > 0) { + SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, 0); + taosRUnLockLatch(&pBlkList->lock); + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); + pSession->lastBlkId = 0; + return code; + } + } else if ((pSession->lastBlkId + 1) < blkNum) { + SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, pSession->lastBlkId + 1); taosRUnLockLatch(&pBlkList->lock); code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); - pSession->lastBlkId = 0; + pSession->lastBlkId++; return code; } - } else if ((pSession->lastBlkId + 1) < blkNum) { - SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, pSession->lastBlkId + 1); taosRUnLockLatch(&pBlkList->lock); - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); - pSession->lastBlkId++; + } else if (pSession->pGroupData->pBlock) { + *ppRes = pSession->pGroupData->pBlock; + pSession->pGroupData->pBlock = NULL; + return TSDB_CODE_SUCCESS; + } + + if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { + *ppRes = NULL; + qDebug("sessionId: %" PRIu64 " fetch done", sessionId); return code; } - taosRUnLockLatch(&pBlkList->lock); - } else if (pSession->pGroupData->pBlock) { - *ppRes = pSession->pGroupData->pBlock; - pSession->pGroupData->pBlock = NULL; - } - - if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { + } else { *ppRes = NULL; + qDebug("sessionId: %" PRIu64 " fetch done since downstream fetch done", sessionId); return code; } @@ -1136,13 +1146,17 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); - if (NULL == pGroup) { + if (NULL == pGroup && NULL != pParam->pChildren && !pCtx->fetchDone) { code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? GROUP_CACHE_DEFAULT_VGID : pGcParam->vgId, pGcParam->tbUid); if (TSDB_CODE_SUCCESS != code) { return code; } } + if (NULL == pGroup) { + return TSDB_CODE_SUCCESS; + } + initGroupCacheSessionCtx(&ctx, pGcParam, pGroup); code = taosHashPut(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx)); @@ -1166,6 +1180,10 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock if (TSDB_CODE_SUCCESS != code) { return code; } + if (NULL == pSession) { + qDebug("session %" PRId64 " in downstream %d total got 0 rows since downtream fetch done", pGcParam->sessionId, pCtx->id); + return TSDB_CODE_SUCCESS; + } } else if (pSession->pGroupData->needCache) { SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); if (ppBlock) { diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index fff83454e9..a45e25f597 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -66,7 +66,7 @@ enum { }; #define COL_DISPLAY_WIDTH 18 -#define JT_MAX_LOOP 1000000 +#define JT_MAX_LOOP 100000 #define LEFT_BLK_ID 0 #define RIGHT_BLK_ID 1 @@ -206,7 +206,7 @@ typedef struct { SJoinTestCtx jtCtx = {0}; -SJoinTestCtrl jtCtrl = {1, 1, 1, 0, 0}; +SJoinTestCtrl jtCtrl = {0, 0, 0, 0, 0}; SJoinTestStat jtStat = {0}; SJoinTestResInfo jtRes = {0}; @@ -2862,7 +2862,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { bool contLoop = true; SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param); - createDummyBlkList(20, 20, 20, 20, 3); + createDummyBlkList(10000, 10000, 10000, 10000, 4096); while (contLoop) { rerunBlockedHere(); @@ -2895,7 +2895,7 @@ void handleCaseEnd() { } // namespace -#if 0 +#if 1 #if 1 TEST(innerJoin, noCondTest) { SJoinTestParam param; @@ -3006,7 +3006,7 @@ TEST(innerJoin, fullCondTest) { #endif -#if 0 +#if 1 #if 1 TEST(leftOuterJoin, noCondTest) { SJoinTestParam param; @@ -3117,7 +3117,7 @@ TEST(leftOuterJoin, fullCondTest) { #endif #endif -#if 0 +#if 1 #if 1 TEST(fullOuterJoin, noCondTest) { SJoinTestParam param; @@ -3229,7 +3229,7 @@ TEST(fullOuterJoin, fullCondTest) { #endif #if 1 -#if 0 +#if 1 TEST(leftSemiJoin, noCondTest) { SJoinTestParam param; char* caseName = "leftSemiJoin:noCondTest"; @@ -3339,7 +3339,7 @@ TEST(leftSemiJoin, fullCondTest) { #endif #endif -#if 0 +#if 1 #if 1 TEST(leftAntiJoin, noCondTest) { SJoinTestParam param; @@ -3450,7 +3450,7 @@ TEST(leftAntiJoin, fullCondTest) { #endif #endif -#if 0 +#if 1 #if 1 TEST(leftAsofJoin, noCondGreaterThanTest) { SJoinTestParam param; @@ -3610,7 +3610,7 @@ TEST(leftAsofJoin, noCondLowerEqTest) { #endif -#if 0 +#if 1 #if 1 TEST(leftWinJoin, noCondProjectionTest) { SJoinTestParam param; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 2d1f528461..3ea35b32df 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1036,6 +1036,11 @@ static void setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColum strcpy(pCol->tableAlias, pTable->table.tableAlias); pCol->isPrimTs = isPrimaryKeyImpl((SNode*)pExpr); pCol->colId = pCol->isPrimTs ? PRIMARYKEY_TIMESTAMP_COL_ID : 0; + if (QUERY_NODE_COLUMN == nodeType(pExpr)) { + pCol->colType = ((SColumnNode*)pExpr)->colType; + strcpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName); + strcpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName); + } strcpy(pCol->colName, pExpr->aliasName); if ('\0' == pCol->node.aliasName[0]) { strcpy(pCol->node.aliasName, pCol->colName); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index ed8ce25769..1aa7e9e410 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -25,6 +25,7 @@ #define OPTIMIZE_FLAG_SCAN_PATH OPTIMIZE_FLAG_MASK(0) #define OPTIMIZE_FLAG_PUSH_DOWN_CONDE OPTIMIZE_FLAG_MASK(1) +#define OPTIMIZE_FLAG_STB_JOIN OPTIMIZE_FLAG_MASK(2) #define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask) #define OPTIMIZE_FLAG_CLEAR_MASK(val, mask) (val) &= (~(mask)) @@ -108,8 +109,8 @@ static SJoinOptimizeOpt gJoinWhereOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] static SJoinOptimizeOpt gJoinOnOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = { /* NONE OUTER SEMI ANTI ASOF WINDOW */ /*INNER*/ {{PUSH_DONW_FLT_COND}, {0}, {0}, {0}, {0}, {0}}, -/*LEFT*/ {{0}, {0}, {PUSH_DONW_FLT_COND}, {0}, {0}, {0}}, -/*RIGHT*/ {{0}, {0}, {PUSH_DONW_FLT_COND}, {0}, {0}, {0}}, +/*LEFT*/ {{0}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DONW_FLT_COND}, {PUSH_DOWN_RIGHT_FLT}, {0}, {0}}, +/*RIGHT*/ {{0}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DONW_FLT_COND}, {PUSH_DOWN_LEFT_FLT}, {0}, {0}}, /*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}}, }; @@ -544,6 +545,14 @@ static int32_t pdcDealScan(SOptimizeContext* pCxt, SScanLogicNode* pScan) { static bool pdcColBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) { SNode* pTableCol = NULL; FOREACH(pTableCol, pTableCols) { + if (QUERY_NODE_COLUMN == nodeType(pCondCol) && QUERY_NODE_COLUMN == nodeType(pTableCol)) { + SColumnNode* pCondColNode = (SColumnNode*)pCondCol; + SColumnNode* pTblColNode = (SColumnNode*)pTableCol; + if (0 == strcmp(pCondColNode->tableAlias, pTblColNode->tableAlias) && 0 == strcmp(pCondColNode->colName, pTblColNode->colName)) { + return true; + } + } + if (nodesEqualNode(pCondCol, pTableCol)) { return true; } @@ -1427,9 +1436,15 @@ static int32_t pdcRewriteTypeBasedOnConds(SOptimizeContext* pCxt, SJoinLogicNode } break; case JOIN_TYPE_FULL: - if (tableCondTypes[0] && !tableCondTypes[1] && tableCondTypes[2] && !tableCondTypes[3]) { - pJoin->joinType = JOIN_TYPE_INNER; - pJoin->subType = JOIN_STYPE_NONE; + if (tableCondTypes[0] && !tableCondTypes[1]) { + if (tableCondTypes[2] && !tableCondTypes[3]) { + pJoin->joinType = JOIN_TYPE_INNER; + pJoin->subType = JOIN_STYPE_NONE; + } else { + pJoin->joinType = JOIN_TYPE_LEFT; + } + } else if (tableCondTypes[2] && !tableCondTypes[3]) { + pJoin->joinType = JOIN_TYPE_RIGHT; } break; default: @@ -1454,16 +1469,18 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { SNode* pRightChildCond = NULL; int32_t code = pdcJoinCheckAllCond(pCxt, pJoin); while (true) { - if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions && 0 != gJoinWhereOpt[t][s].pushDownFlag) { - code = pdcJoinSplitCond(pJoin, &pJoin->node.pConditions, &pOnCond, &pLeftChildCond, &pRightChildCond, true); - if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) { - code = pdcJoinPushDownOnCond(pCxt, pJoin, &pOnCond); - } - if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) { - code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond); - } - if (TSDB_CODE_SUCCESS == code && NULL != pRightChildCond) { - code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond); + if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions) { + if (0 != gJoinWhereOpt[t][s].pushDownFlag) { + code = pdcJoinSplitCond(pJoin, &pJoin->node.pConditions, &pOnCond, &pLeftChildCond, &pRightChildCond, true); + if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) { + code = pdcJoinPushDownOnCond(pCxt, pJoin, &pOnCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) { + code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pRightChildCond) { + code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond); + } } if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions) { code = pdcRewriteTypeBasedOnConds(pCxt, pJoin); @@ -1478,7 +1495,7 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { s = pJoin->subType; } - if (TSDB_CODE_SUCCESS == code && NULL != pJoin->pFullOnCond) { + if (TSDB_CODE_SUCCESS == code && NULL != pJoin->pFullOnCond && 0 != gJoinOnOpt[t][s].pushDownFlag) { code = pdcJoinSplitCond(pJoin, &pJoin->pFullOnCond, NULL, &pLeftChildCond, &pRightChildCond, false); if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) { code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond); @@ -2207,6 +2224,69 @@ static int32_t sortForJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic return sortForJoinOptimizeImpl(pCxt, pLogicSubplan, pJoin); } +static bool joinCondMayBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) { + return false; + } + + SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; + if (pNode->pChildren->length != 2 || JOIN_ALGO_HASH == pJoin->joinAlgo || JOIN_TYPE_FULL == pJoin->joinType) { + return false; + } + + SLogicNode* pLeft = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0); + SLogicNode* pRight = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1); + + if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pLeft) || QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pRight)) { + return false; + } + + SScanLogicNode* pLScan = (SScanLogicNode*)pLeft; + SScanLogicNode* pRScan = (SScanLogicNode*)pRight; + + if (!IS_TSWINDOW_SPECIFIED(pLScan->scanRange) && !IS_TSWINDOW_SPECIFIED(pLScan->scanRange)) { + return false; + } + + return true; +} + +static void joinCondMergeScanRand(STimeWindow* pDst, STimeWindow* pSrc) { + if (pSrc->skey > pDst->skey) { + pDst->skey = pSrc->skey; + } + if (pSrc->ekey < pDst->ekey) { + pDst->ekey = pSrc->ekey; + } +} + +static int32_t joinCondOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SJoinLogicNode* pJoin = (SJoinLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, joinCondMayBeOptimized); + if (NULL == pJoin) { + return TSDB_CODE_SUCCESS; + } + + SScanLogicNode* pLScan = (SScanLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0); + SScanLogicNode* pRScan = (SScanLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1); + + switch (pJoin->joinType) { + case JOIN_TYPE_INNER: + joinCondMergeScanRand(&pLScan->scanRange, &pRScan->scanRange); + pRScan->scanRange.skey = pLScan->scanRange.skey; + pRScan->scanRange.ekey = pLScan->scanRange.ekey; + break; + case JOIN_TYPE_LEFT: + joinCondMergeScanRand(&pRScan->scanRange, &pLScan->scanRange); + break; + case JOIN_TYPE_RIGHT: + joinCondMergeScanRand(&pLScan->scanRange, &pRScan->scanRange); + break; + default: + break; + } + + return TSDB_CODE_SUCCESS; +} static bool smaIndexOptMayBeOptimized(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || NULL == pNode->pParent || @@ -2666,15 +2746,19 @@ static bool eliminateProjOptCheckProjColumnNames(SProjectLogicNode* pProjectNode } static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) { - // TODO: enable this optimization after new mechanising that map projection and targets of project node - if (NULL != pNode->pParent) { + // Super table scan requires project operator to merge packets to improve performance. + if (NULL == pNode->pParent && (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || + (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)) && + TSDB_SUPER_TABLE == ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->tableType))) { return false; } - // Super table scan requires project operator to merge packets to improve performance. - if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || - (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)) && - TSDB_SUPER_TABLE == ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->tableType)) { + if (NULL != pNode->pParent && (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || + QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)))) { + return false; + } + + if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) { return false; } @@ -2729,6 +2813,7 @@ static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild, nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); if (!cxt.canUse) return false; } + if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && ((SJoinLogicNode*)pChild)->joinAlgo != JOIN_ALGO_UNKNOWN) { return false; } @@ -2763,29 +2848,46 @@ static void alignProjectionWithTarget(SLogicNode* pNode) { } } +static EDealRes eliminateProjOptRewriteScanTableAlias(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + SColumnNode* pCol = (SColumnNode*)pNode; + strcpy(pCol->tableAlias, (char*)pContext); + } + return DEAL_RES_CONTINUE; +} + static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SProjectLogicNode* pProjectNode) { SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProjectNode->node.pChildren, 0); SNodeList* pNewChildTargets = nodesMakeList(); - SNode* pProjection = NULL; - FOREACH(pProjection, pProjectNode->pProjections) { - SNode* pChildTarget = NULL; - FOREACH(pChildTarget, pChild->pTargets) { - if (0 == strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) { - nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget)); - break; + if (NULL == pProjectNode->node.pParent) { + SNode* pProjection = NULL; + FOREACH(pProjection, pProjectNode->pProjections) { + SNode* pChildTarget = NULL; + FOREACH(pChildTarget, pChild->pTargets) { + if (0 == strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) { + nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget)); + break; + } } } - } - if (eliminateProjOptCanChildConditionUseChildTargets(pChild, pNewChildTargets)) { - nodesDestroyList(pChild->pTargets); - pChild->pTargets = pNewChildTargets; - } else { - nodesDestroyList(pNewChildTargets); - return TSDB_CODE_SUCCESS; - } + if (eliminateProjOptCanChildConditionUseChildTargets(pChild, pNewChildTargets)) { + nodesDestroyList(pChild->pTargets); + pChild->pTargets = pNewChildTargets; + } else { + nodesDestroyList(pNewChildTargets); + return TSDB_CODE_SUCCESS; + } + } else { + SScanLogicNode* pScan = (SScanLogicNode*)pChild; + nodesWalkExprs(pScan->pScanCols, eliminateProjOptRewriteScanTableAlias, pProjectNode->stmtName); + nodesWalkExprs(pScan->pScanPseudoCols, eliminateProjOptRewriteScanTableAlias, pProjectNode->stmtName); + nodesWalkExpr(pScan->node.pConditions, eliminateProjOptRewriteScanTableAlias, pProjectNode->stmtName); + nodesWalkExprs(pChild->pTargets, eliminateProjOptRewriteScanTableAlias, pProjectNode->stmtName); + } + int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild); if (pProjectNode->node.pHint && !pChild->pHint) TSWAP(pProjectNode->node.pHint, pChild->pHint); if (TSDB_CODE_SUCCESS == code) { @@ -4427,16 +4529,23 @@ static int32_t sortNonPriKeyOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog } static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) { + if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode) || OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_STB_JOIN)) { return false; } SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; + if (pJoin->joinAlgo == JOIN_ALGO_UNKNOWN) { + pJoin->joinAlgo = JOIN_ALGO_MERGE; + } + if (JOIN_STYPE_NONE != pJoin->subType || pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2 - || pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN || pJoin->isLowLevelJoin) { - if (pJoin->joinAlgo == JOIN_ALGO_UNKNOWN) { - pJoin->joinAlgo = JOIN_ALGO_MERGE; - } + || pJoin->isLowLevelJoin) { + return false; + } + + SNode* pLeft = nodesListGetNode(pJoin->node.pChildren, 0); + SNode* pRight = nodesListGetNode(pJoin->node.pChildren, 1); + if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pLeft) || QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pRight)) { return false; } @@ -4582,7 +4691,8 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh nodesDestroyList(pCols); if (TSDB_CODE_SUCCESS == code) { - *ppLogic = (SLogicNode*)pJoin; + *ppLogic = (SLogicNode*)pJoin; + OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_STB_JOIN); } else { nodesDestroyNode((SNode*)pJoin); } @@ -4729,6 +4839,7 @@ static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChi if (TSDB_CODE_SUCCESS == code) { pChild->pParent = (SLogicNode*)pJoin; *ppLogic = (SLogicNode*)pJoin; + OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_STB_JOIN); } else { nodesDestroyNode((SNode*)pJoin); } @@ -5089,6 +5200,7 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub static const SOptimizeRule optimizeRuleSet[] = { {.pName = "ScanPath", .optimizeFunc = scanPathOptimize}, {.pName = "PushDownCondition", .optimizeFunc = pdcOptimize}, + {.pName = "JoinCondOptimize", .optimizeFunc = joinCondOptimize}, {.pName = "StableJoin", .optimizeFunc = stableJoinOptimize}, {.pName = "GroupJoin", .optimizeFunc = groupJoinOptimize}, {.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize},