fix: join blockId and target issues

This commit is contained in:
dapan1121 2023-08-17 13:50:26 +08:00
parent 73d7caa63a
commit e8fa9aa633
13 changed files with 53 additions and 32 deletions

View File

@ -304,7 +304,7 @@ typedef enum ENodeType {
QUERY_NODE_REVOKE_STMT,
QUERY_NODE_SHOW_DNODES_STMT,
QUERY_NODE_SHOW_MNODES_STMT,
// QUERY_NODE_SHOW_MODULES_STMT,
QUERY_NODE_SHOW_MODULES_STMT,
QUERY_NODE_SHOW_QNODES_STMT,
QUERY_NODE_SHOW_SNODES_STMT,
QUERY_NODE_SHOW_BNODES_STMT,
@ -367,10 +367,10 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_PARTITION,
QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC,
QUERY_NODE_LOGIC_PLAN_INTERP_FUNC,
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE,
QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL,
QUERY_NODE_LOGIC_SUBPLAN,
QUERY_NODE_LOGIC_PLAN,
QUERY_NODE_LOGIC_PLAN_GROUP_CACHE,
QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL,
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100,
@ -383,7 +383,6 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN,
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN,
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_MERGE,
@ -411,13 +410,14 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT,
QUERY_NODE_PHYSICAL_PLAN_DELETE,
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE,
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN,
QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN,
QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE,
QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL
} ENodeType;

View File

@ -124,6 +124,7 @@ typedef struct SJoinLogicNode {
SNode* pOtherOnCond;
bool isSingleTableJoin;
bool hasSubQuery;
bool isLowLevelJoin;
} SJoinLogicNode;
typedef struct SAggLogicNode {

View File

@ -200,6 +200,7 @@ typedef struct SJoinTableNode {
STableNode table; // QUERY_NODE_JOIN_TABLE
EJoinType joinType;
bool hasSubQuery;
bool isLowLevelJoin;
SNode* pLeft;
SNode* pRight;
SNode* pOnCond;

View File

@ -70,6 +70,7 @@ typedef struct SStbJoinDynCtrlInfo {
SDynQueryCtrlExecInfo execInfo;
SStbJoinDynCtrlBasic basic;
SStbJoinDynCtrlCtx ctx;
int16_t outputBlkId;
} SStbJoinDynCtrlInfo;
typedef struct SDynQueryCtrlOperatorInfo {

View File

@ -757,6 +757,12 @@ static void seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppR
return;
}
static FORCE_INLINE SSDataBlock* seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
pBlock->info.id.blockId = pStbJoin->outputBlkId;
return pBlock;
}
SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
@ -792,7 +798,7 @@ _return:
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
return pRes;
return pRes ? seqStableJoinComposeRes(pStbJoin, pRes) : NULL;
}
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
@ -847,6 +853,7 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32
switch (pInfo->qType) {
case DYN_QTYPE_STB_HASH:
memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
pInfo->stbJoin.outputBlkId = pPhyciNode->node.pOutputDataBlockDesc->dataBlockId;
code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
if (TSDB_CODE_SUCCESS != code) {
goto _error;

View File

@ -259,6 +259,8 @@ static int32_t tempTableNodeCopy(const STempTableNode* pSrc, STempTableNode* pDs
static int32_t joinTableNodeCopy(const SJoinTableNode* pSrc, SJoinTableNode* pDst) {
COPY_BASE_OBJECT_FIELD(table, tableNodeCopy);
COPY_SCALAR_FIELD(joinType);
COPY_SCALAR_FIELD(hasSubQuery);
COPY_SCALAR_FIELD(isLowLevelJoin);
CLONE_NODE_FIELD(pLeft);
CLONE_NODE_FIELD(pRight);
CLONE_NODE_FIELD(pOnCond);

View File

@ -197,10 +197,8 @@ const char* nodesNodeName(ENodeType type) {
return "ShowDnodesStmt";
case QUERY_NODE_SHOW_MNODES_STMT:
return "ShowMnodesStmt";
/*
case QUERY_NODE_SHOW_MODULES_STMT:
return "ShowModulesStmt";
*/
case QUERY_NODE_SHOW_QNODES_STMT:
return "ShowQnodesStmt";
case QUERY_NODE_SHOW_SNODES_STMT:

View File

@ -408,7 +408,7 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SRevokeStmt));
case QUERY_NODE_SHOW_DNODES_STMT:
case QUERY_NODE_SHOW_MNODES_STMT:
// case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_BNODES_STMT:
@ -1010,7 +1010,7 @@ void nodesDestroyNode(SNode* pNode) {
break;
case QUERY_NODE_SHOW_DNODES_STMT:
case QUERY_NODE_SHOW_MNODES_STMT:
// case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_BNODES_STMT:

View File

@ -690,10 +690,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowDnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_MNODES_STMT:
return collectMetaKeyFromShowMnodes(pCxt, (SShowStmt*)pStmt);
/*
case QUERY_NODE_SHOW_MODULES_STMT:
return collectMetaKeyFromShowModules(pCxt, (SShowStmt*)pStmt);
*/
case QUERY_NODE_SHOW_QNODES_STMT:
return collectMetaKeyFromShowQnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_SNODES_STMT:

View File

@ -263,7 +263,7 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
return authAlterTable(pCxt, (SAlterTableStmt*)pStmt);
case QUERY_NODE_SHOW_DNODES_STMT:
case QUERY_NODE_SHOW_MNODES_STMT:
// case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_BNODES_STMT:

View File

@ -92,7 +92,6 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = {
.numOfShowCols = 1,
.pShowCols = {"*"}
},
/*
{
.showType = QUERY_NODE_SHOW_MODULES_STMT,
.pDbName = TSDB_INFORMATION_SCHEMA_DB,
@ -100,7 +99,6 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = {
.numOfShowCols = 1,
.pShowCols = {"*"}
},
*/
{
.showType = QUERY_NODE_SHOW_QNODES_STMT,
.pDbName = TSDB_INFORMATION_SCHEMA_DB,
@ -2811,6 +2809,12 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
pJoinTable->table.singleTable = joinTableIsSingleTable(pJoinTable);
code = translateExpr(pCxt, &pJoinTable->pOnCond);
pJoinTable->hasSubQuery = (nodeType(pJoinTable->pLeft) != QUERY_NODE_REAL_TABLE) || (nodeType(pJoinTable->pRight) != QUERY_NODE_REAL_TABLE);
if (nodeType(pJoinTable->pLeft) == QUERY_NODE_JOIN_TABLE) {
((SJoinTableNode*)pJoinTable->pLeft)->isLowLevelJoin = true;
}
if (nodeType(pJoinTable->pRight) == QUERY_NODE_JOIN_TABLE) {
((SJoinTableNode*)pJoinTable->pRight)->isLowLevelJoin = true;
}
}
break;
}
@ -9178,7 +9182,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_USERS_STMT:
case QUERY_NODE_SHOW_DNODES_STMT:
case QUERY_NODE_SHOW_MNODES_STMT:
// case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_FUNCTIONS_STMT:
case QUERY_NODE_SHOW_INDEXES_STMT:

View File

@ -442,6 +442,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pJoin->node.groupAction = GROUP_ACTION_CLEAR;
pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL;
pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin;
int32_t code = TSDB_CODE_SUCCESS;
@ -478,7 +479,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
// set the output
if (TSDB_CODE_SUCCESS == code) {
SNodeList* pColList = NULL;
if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pLeft)) {
if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pLeft) && !pJoin->isLowLevelJoin) {
code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, ((SRealTableNode*)pJoinTable->pLeft)->table.tableAlias, COLLECT_COL_TYPE_ALL, &pColList);
} else {
pJoin->node.pTargets = nodesCloneList(pLeft->pTargets);
@ -493,7 +494,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
if (TSDB_CODE_SUCCESS == code) {
SNodeList* pColList = NULL;
if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pRight)) {
if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pRight) && !pJoin->isLowLevelJoin) {
code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, ((SRealTableNode*)pJoinTable->pRight)->table.tableAlias, COLLECT_COL_TYPE_ALL, &pColList);
} else {
if (pJoin->node.pTargets) {

View File

@ -656,7 +656,7 @@ static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode
return pushDownCondOptAppendCond(&pChild->pConditions, pCond);
}
static bool pushDownCondOptIsPriKey(SNode* pNode, SNodeList* pTableCols) {
static bool pushDownCondOptIsPriKey(SNode* pNode, SSHashObj* pTables) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return false;
}
@ -664,7 +664,7 @@ static bool pushDownCondOptIsPriKey(SNode* pNode, SNodeList* pTableCols) {
if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId || TSDB_SYSTEM_TABLE == pCol->tableType) {
return false;
}
return pushDownCondOptBelongThisTable(pNode, pTableCols);
return pushDownCondOptColInTableList(pNode, pTables);
}
static bool pushDownCondOptIsPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
@ -677,14 +677,22 @@ static bool pushDownCondOptIsPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond
return false;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
if (pushDownCondOptIsPriKey(pOper->pLeft, pLeftCols)) {
return pushDownCondOptIsPriKey(pOper->pRight, pRightCols);
} else if (pushDownCondOptIsPriKey(pOper->pLeft, pRightCols)) {
return pushDownCondOptIsPriKey(pOper->pRight, pLeftCols);
SSHashObj* pLeftTables = NULL;
SSHashObj* pRightTables = NULL;
collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 0), &pLeftTables);
collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 1), &pRightTables);
bool res = false;
if (pushDownCondOptIsPriKey(pOper->pLeft, pLeftTables)) {
res = pushDownCondOptIsPriKey(pOper->pRight, pRightTables);
} else if (pushDownCondOptIsPriKey(pOper->pLeft, pRightTables)) {
res = pushDownCondOptIsPriKey(pOper->pRight, pLeftTables);
}
return false;
tSimpleHashCleanup(pLeftTables);
tSimpleHashCleanup(pRightTables);
return res;
}
static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
@ -3177,7 +3185,7 @@ static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) {
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2
|| pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN || (pNode->pParent && nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_JOIN)) {
|| pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN || pJoin->isLowLevelJoin) {
if (pJoin->joinAlgo == JOIN_ALGO_UNKNOWN) {
pJoin->joinAlgo = JOIN_ALGO_MERGE;
}