From e8fa9aa6337f3ae9a419da399e8dbc60871ac32c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 17 Aug 2023 13:50:26 +0800 Subject: [PATCH] fix: join blockId and target issues --- include/common/tmsg.h | 16 +++++------ include/libs/nodes/plannodes.h | 1 + include/libs/nodes/querynodes.h | 1 + source/libs/executor/inc/dynqueryctrl.h | 1 + .../libs/executor/src/dynqueryctrloperator.c | 9 +++++- source/libs/nodes/src/nodesCloneFuncs.c | 2 ++ source/libs/nodes/src/nodesCodeFuncs.c | 2 -- source/libs/nodes/src/nodesUtilFuncs.c | 4 +-- source/libs/parser/src/parAstParser.c | 2 -- source/libs/parser/src/parAuthenticator.c | 2 +- source/libs/parser/src/parTranslater.c | 10 +++++-- source/libs/planner/src/planLogicCreater.c | 7 +++-- source/libs/planner/src/planOptimizer.c | 28 ++++++++++++------- 13 files changed, 53 insertions(+), 32 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1a61269aaf..348fb973a8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -304,7 +304,7 @@ typedef enum ENodeType { QUERY_NODE_REVOKE_STMT, QUERY_NODE_SHOW_DNODES_STMT, QUERY_NODE_SHOW_MNODES_STMT, -// QUERY_NODE_SHOW_MODULES_STMT, + QUERY_NODE_SHOW_MODULES_STMT, QUERY_NODE_SHOW_QNODES_STMT, QUERY_NODE_SHOW_SNODES_STMT, QUERY_NODE_SHOW_BNODES_STMT, @@ -367,10 +367,10 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN_PARTITION, QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC, QUERY_NODE_LOGIC_PLAN_INTERP_FUNC, - QUERY_NODE_LOGIC_PLAN_GROUP_CACHE, - QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL, QUERY_NODE_LOGIC_SUBPLAN, QUERY_NODE_LOGIC_PLAN, + QUERY_NODE_LOGIC_PLAN_GROUP_CACHE, + QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL, // physical plan node QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100, @@ -383,7 +383,6 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, QUERY_NODE_PHYSICAL_PLAN_PROJECT, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, - QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, QUERY_NODE_PHYSICAL_PLAN_MERGE, @@ -411,13 +410,14 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, QUERY_NODE_PHYSICAL_PLAN_DELETE, - QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, - QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, + QUERY_NODE_PHYSICAL_SUBPLAN, + QUERY_NODE_PHYSICAL_PLAN, QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT, QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, - QUERY_NODE_PHYSICAL_SUBPLAN, - QUERY_NODE_PHYSICAL_PLAN + QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, + QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, + QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL } ENodeType; diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index fa226c3ac3..9d2247f479 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -124,6 +124,7 @@ typedef struct SJoinLogicNode { SNode* pOtherOnCond; bool isSingleTableJoin; bool hasSubQuery; + bool isLowLevelJoin; } SJoinLogicNode; typedef struct SAggLogicNode { diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 3f6dbf4a2a..6d3fd6480d 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -200,6 +200,7 @@ typedef struct SJoinTableNode { STableNode table; // QUERY_NODE_JOIN_TABLE EJoinType joinType; bool hasSubQuery; + bool isLowLevelJoin; SNode* pLeft; SNode* pRight; SNode* pOnCond; diff --git a/source/libs/executor/inc/dynqueryctrl.h b/source/libs/executor/inc/dynqueryctrl.h index fe1a1140af..793fbc0e61 100755 --- a/source/libs/executor/inc/dynqueryctrl.h +++ b/source/libs/executor/inc/dynqueryctrl.h @@ -70,6 +70,7 @@ typedef struct SStbJoinDynCtrlInfo { SDynQueryCtrlExecInfo execInfo; SStbJoinDynCtrlBasic basic; SStbJoinDynCtrlCtx ctx; + int16_t outputBlkId; } SStbJoinDynCtrlInfo; typedef struct SDynQueryCtrlOperatorInfo { diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 6fb7e693cf..f2ed4ba618 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -757,6 +757,12 @@ static void seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppR return; } +static FORCE_INLINE SSDataBlock* seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) { + pBlock->info.id.blockId = pStbJoin->outputBlkId; + return pBlock; +} + + SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; @@ -792,7 +798,7 @@ _return: pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - return pRes; + return pRes ? seqStableJoinComposeRes(pStbJoin, pRes) : NULL; } int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) { @@ -847,6 +853,7 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32 switch (pInfo->qType) { case DYN_QTYPE_STB_HASH: memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); + pInfo->stbJoin.outputBlkId = pPhyciNode->node.pOutputDataBlockDesc->dataBlockId; code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch); if (TSDB_CODE_SUCCESS != code) { goto _error; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 1e9075994a..c62190b68a 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -259,6 +259,8 @@ static int32_t tempTableNodeCopy(const STempTableNode* pSrc, STempTableNode* pDs static int32_t joinTableNodeCopy(const SJoinTableNode* pSrc, SJoinTableNode* pDst) { COPY_BASE_OBJECT_FIELD(table, tableNodeCopy); COPY_SCALAR_FIELD(joinType); + COPY_SCALAR_FIELD(hasSubQuery); + COPY_SCALAR_FIELD(isLowLevelJoin); CLONE_NODE_FIELD(pLeft); CLONE_NODE_FIELD(pRight); CLONE_NODE_FIELD(pOnCond); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index d8f250d38d..c72b03817b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -197,10 +197,8 @@ const char* nodesNodeName(ENodeType type) { return "ShowDnodesStmt"; case QUERY_NODE_SHOW_MNODES_STMT: return "ShowMnodesStmt"; -/* case QUERY_NODE_SHOW_MODULES_STMT: return "ShowModulesStmt"; -*/ case QUERY_NODE_SHOW_QNODES_STMT: return "ShowQnodesStmt"; case QUERY_NODE_SHOW_SNODES_STMT: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 83bf9099ef..e989048545 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -408,7 +408,7 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SRevokeStmt)); case QUERY_NODE_SHOW_DNODES_STMT: case QUERY_NODE_SHOW_MNODES_STMT: -// case QUERY_NODE_SHOW_MODULES_STMT: + case QUERY_NODE_SHOW_MODULES_STMT: case QUERY_NODE_SHOW_QNODES_STMT: case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_BNODES_STMT: @@ -1010,7 +1010,7 @@ void nodesDestroyNode(SNode* pNode) { break; case QUERY_NODE_SHOW_DNODES_STMT: case QUERY_NODE_SHOW_MNODES_STMT: -// case QUERY_NODE_SHOW_MODULES_STMT: + case QUERY_NODE_SHOW_MODULES_STMT: case QUERY_NODE_SHOW_QNODES_STMT: case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_BNODES_STMT: diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 86b4566d37..fdec9cba79 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -690,10 +690,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowDnodes(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_MNODES_STMT: return collectMetaKeyFromShowMnodes(pCxt, (SShowStmt*)pStmt); -/* case QUERY_NODE_SHOW_MODULES_STMT: return collectMetaKeyFromShowModules(pCxt, (SShowStmt*)pStmt); -*/ case QUERY_NODE_SHOW_QNODES_STMT: return collectMetaKeyFromShowQnodes(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_SNODES_STMT: diff --git a/source/libs/parser/src/parAuthenticator.c b/source/libs/parser/src/parAuthenticator.c index 6a26dcfa8b..9b2ac662c8 100644 --- a/source/libs/parser/src/parAuthenticator.c +++ b/source/libs/parser/src/parAuthenticator.c @@ -263,7 +263,7 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { return authAlterTable(pCxt, (SAlterTableStmt*)pStmt); case QUERY_NODE_SHOW_DNODES_STMT: case QUERY_NODE_SHOW_MNODES_STMT: -// case QUERY_NODE_SHOW_MODULES_STMT: + case QUERY_NODE_SHOW_MODULES_STMT: case QUERY_NODE_SHOW_QNODES_STMT: case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_BNODES_STMT: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5b8318265e..efc6e56847 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -92,7 +92,6 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = { .numOfShowCols = 1, .pShowCols = {"*"} }, -/* { .showType = QUERY_NODE_SHOW_MODULES_STMT, .pDbName = TSDB_INFORMATION_SCHEMA_DB, @@ -100,7 +99,6 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = { .numOfShowCols = 1, .pShowCols = {"*"} }, -*/ { .showType = QUERY_NODE_SHOW_QNODES_STMT, .pDbName = TSDB_INFORMATION_SCHEMA_DB, @@ -2811,6 +2809,12 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { pJoinTable->table.singleTable = joinTableIsSingleTable(pJoinTable); code = translateExpr(pCxt, &pJoinTable->pOnCond); pJoinTable->hasSubQuery = (nodeType(pJoinTable->pLeft) != QUERY_NODE_REAL_TABLE) || (nodeType(pJoinTable->pRight) != QUERY_NODE_REAL_TABLE); + if (nodeType(pJoinTable->pLeft) == QUERY_NODE_JOIN_TABLE) { + ((SJoinTableNode*)pJoinTable->pLeft)->isLowLevelJoin = true; + } + if (nodeType(pJoinTable->pRight) == QUERY_NODE_JOIN_TABLE) { + ((SJoinTableNode*)pJoinTable->pRight)->isLowLevelJoin = true; + } } break; } @@ -9178,7 +9182,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { case QUERY_NODE_SHOW_USERS_STMT: case QUERY_NODE_SHOW_DNODES_STMT: case QUERY_NODE_SHOW_MNODES_STMT: -// case QUERY_NODE_SHOW_MODULES_STMT: + case QUERY_NODE_SHOW_MODULES_STMT: case QUERY_NODE_SHOW_QNODES_STMT: case QUERY_NODE_SHOW_FUNCTIONS_STMT: case QUERY_NODE_SHOW_INDEXES_STMT: diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 0ff98e4012..8db803d3cd 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -442,7 +442,8 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->node.groupAction = GROUP_ACTION_CLEAR; pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL; - + pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin; + int32_t code = TSDB_CODE_SUCCESS; // set left and right node @@ -478,7 +479,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect // set the output if (TSDB_CODE_SUCCESS == code) { SNodeList* pColList = NULL; - if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pLeft)) { + if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pLeft) && !pJoin->isLowLevelJoin) { code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, ((SRealTableNode*)pJoinTable->pLeft)->table.tableAlias, COLLECT_COL_TYPE_ALL, &pColList); } else { pJoin->node.pTargets = nodesCloneList(pLeft->pTargets); @@ -493,7 +494,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { SNodeList* pColList = NULL; - if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pRight)) { + if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pRight) && !pJoin->isLowLevelJoin) { code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, ((SRealTableNode*)pJoinTable->pRight)->table.tableAlias, COLLECT_COL_TYPE_ALL, &pColList); } else { if (pJoin->node.pTargets) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index e2165dbf96..285b60b852 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -656,7 +656,7 @@ static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode return pushDownCondOptAppendCond(&pChild->pConditions, pCond); } -static bool pushDownCondOptIsPriKey(SNode* pNode, SNodeList* pTableCols) { +static bool pushDownCondOptIsPriKey(SNode* pNode, SSHashObj* pTables) { if (QUERY_NODE_COLUMN != nodeType(pNode)) { return false; } @@ -664,7 +664,7 @@ static bool pushDownCondOptIsPriKey(SNode* pNode, SNodeList* pTableCols) { if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId || TSDB_SYSTEM_TABLE == pCol->tableType) { return false; } - return pushDownCondOptBelongThisTable(pNode, pTableCols); + return pushDownCondOptColInTableList(pNode, pTables); } static bool pushDownCondOptIsPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { @@ -677,14 +677,22 @@ static bool pushDownCondOptIsPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond return false; } - SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; - SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; - if (pushDownCondOptIsPriKey(pOper->pLeft, pLeftCols)) { - return pushDownCondOptIsPriKey(pOper->pRight, pRightCols); - } else if (pushDownCondOptIsPriKey(pOper->pLeft, pRightCols)) { - return pushDownCondOptIsPriKey(pOper->pRight, pLeftCols); + SSHashObj* pLeftTables = NULL; + SSHashObj* pRightTables = NULL; + collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 0), &pLeftTables); + collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 1), &pRightTables); + + bool res = false; + if (pushDownCondOptIsPriKey(pOper->pLeft, pLeftTables)) { + res = pushDownCondOptIsPriKey(pOper->pRight, pRightTables); + } else if (pushDownCondOptIsPriKey(pOper->pLeft, pRightTables)) { + res = pushDownCondOptIsPriKey(pOper->pRight, pLeftTables); } - return false; + + tSimpleHashCleanup(pLeftTables); + tSimpleHashCleanup(pRightTables); + + return res; } static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { @@ -3177,7 +3185,7 @@ static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) { SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2 - || pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN || (pNode->pParent && nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_JOIN)) { + || pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN || pJoin->isLowLevelJoin) { if (pJoin->joinAlgo == JOIN_ALGO_UNKNOWN) { pJoin->joinAlgo = JOIN_ALGO_MERGE; }