From f36b0be17d0c91dcd45941bd5d753d39293e6856 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 19 May 2023 14:32:03 +0800 Subject: [PATCH] fix: join eq conditions not only for tag --- include/libs/nodes/plannodes.h | 16 +++---- source/libs/executor/src/joinoperator.c | 56 +++++++++++----------- source/libs/nodes/src/nodesCloneFuncs.c | 4 +- source/libs/nodes/src/nodesCodeFuncs.c | 12 ++--- source/libs/nodes/src/nodesMsgFuncs.c | 8 ++-- source/libs/nodes/src/nodesUtilFuncs.c | 14 +++--- source/libs/planner/src/planOptimizer.c | 33 ++++++------- source/libs/planner/src/planPhysiCreater.c | 6 +-- 8 files changed, 73 insertions(+), 76 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 0e235191b1..f559ba2b51 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -112,7 +112,7 @@ typedef struct SJoinLogicNode { SNode* pOnConditions; bool isSingleTableJoin; EOrder inputTsOrder; - SNode* pTagEqualConditions; + SNode* pEqualOnConditions; } SJoinLogicNode; typedef struct SAggLogicNode { @@ -406,7 +406,7 @@ typedef struct SSortMergeJoinPhysiNode { SNode* pOnConditions; SNodeList* pTargets; EOrder inputTsOrder; - SNode* pTagEqualCondtions; + SNode* pEqualOnCondtions; } SSortMergeJoinPhysiNode; typedef struct SAggPhysiNode { @@ -448,7 +448,7 @@ typedef struct SMergePhysiNode { bool groupSort; } SMergePhysiNode; -typedef struct SWinodwPhysiNode { +typedef struct SWindowPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of parameter expression of function SNodeList* pFuncs; @@ -461,10 +461,10 @@ typedef struct SWinodwPhysiNode { EOrder inputTsOrder; EOrder outputTsOrder; bool mergeDataBlock; -} SWinodwPhysiNode; +} SWindowPhysiNode; typedef struct SIntervalPhysiNode { - SWinodwPhysiNode window; + SWindowPhysiNode window; int64_t interval; int64_t offset; int64_t sliding; @@ -497,7 +497,7 @@ typedef struct SMultiTableIntervalPhysiNode { } SMultiTableIntervalPhysiNode; typedef struct SSessionWinodwPhysiNode { - SWinodwPhysiNode window; + SWindowPhysiNode window; int64_t gap; } SSessionWinodwPhysiNode; @@ -506,14 +506,14 @@ typedef SSessionWinodwPhysiNode SStreamSemiSessionWinodwPhysiNode; typedef SSessionWinodwPhysiNode SStreamFinalSessionWinodwPhysiNode; typedef struct SStateWinodwPhysiNode { - SWinodwPhysiNode window; + SWindowPhysiNode window; SNode* pStateKey; } SStateWinodwPhysiNode; typedef SStateWinodwPhysiNode SStreamStateWinodwPhysiNode; typedef struct SEventWinodwPhysiNode { - SWinodwPhysiNode window; + SWindowPhysiNode window; SNode* pStartCond; SNode* pEndCond; } SEventWinodwPhysiNode; diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 9dee58367c..46c6b24295 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -52,15 +52,15 @@ typedef struct SJoinOperatorInfo { int32_t rightPos; SColumnInfo rightCol; SNode* pCondAfterMerge; - SNode* pTagEqualConditions; + SNode* pEqualOnConditions; - SArray* leftTagCols; - char* leftTagKeyBuf; - int32_t leftTagKeyLen; + SArray* leftEqOnCondCols; + char* leftEqOnCondKeyBuf; + int32_t leftEqOnCondKeyLen; - SArray* rightTagCols; - char* rightTagKeyBuf; - int32_t rightTagKeyLen; + SArray* rightEqOnCondCols; + char* rightEqOnCondKeyBuf; + int32_t rightEqOnCondKeyLen; SSHashObj* rightBuildTable; SJoinRowCtx rowCtx; @@ -104,7 +104,7 @@ static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDown setJoinColumnInfo(&pInfo->rightCol, rightTsCol); } -static void extractTagEqualColsFromOper(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode, +static void extractEqualOnCondColsFromOper(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode, SColumn* pLeft, SColumn* pRight) { SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft; SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight; @@ -125,7 +125,7 @@ static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pD SNode* pNode = NULL; FOREACH(pNode, ((SLogicConditionNode*)pTagEqualNode)->pParameterList) { SOperatorNode* pOperNode = (SOperatorNode*)pNode; - extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); + extractEqualOnCondColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); taosArrayPush(leftTagEqCols, &left); taosArrayPush(rightTagEqCols, &right); } @@ -134,7 +134,7 @@ static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pD if (nodeType(pTagEqualNode) == QUERY_NODE_OPERATOR) { SOperatorNode* pOperNode = (SOperatorNode*)pTagEqualNode; - extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); + extractEqualOnCondColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); taosArrayPush(leftTagEqCols, &left); taosArrayPush(rightTagEqCols, &right); } @@ -259,13 +259,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->inputOrder = TSDB_ORDER_DESC; } - pInfo->pTagEqualConditions = pJoinNode->pTagEqualCondtions; - if (pInfo->pTagEqualConditions != NULL) { - pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn)); - pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn)); - extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); - initTagColskeyBuf(&pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); - initTagColskeyBuf(&pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); + pInfo->pEqualOnConditions = pJoinNode->pEqualOnCondtions; + if (pInfo->pEqualOnConditions != NULL) { + pInfo->leftEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); + pInfo->rightEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); + extractTagEqualCondCols(pInfo, pDownstream, pInfo->pEqualOnConditions, pInfo->leftEqOnCondCols, pInfo->rightEqOnCondCols); + initTagColskeyBuf(&pInfo->leftEqOnCondKeyLen, &pInfo->leftEqOnCondKeyBuf, pInfo->leftEqOnCondCols); + initTagColskeyBuf(&pInfo->rightEqOnCondKeyLen, &pInfo->rightEqOnCondKeyBuf, pInfo->rightEqOnCondCols); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->rightBuildTable = tSimpleHashInit(256, hashFn); } @@ -309,13 +309,13 @@ static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; - if (pJoinOperator->pTagEqualConditions != NULL) { + if (pJoinOperator->pEqualOnConditions != NULL) { mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); - taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); - taosArrayDestroy(pJoinOperator->rightTagCols); + taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf); + taosArrayDestroy(pJoinOperator->rightEqOnCondCols); - taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf); - taosArrayDestroy(pJoinOperator->leftTagCols); + taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf); + taosArrayDestroy(pJoinOperator->leftEqOnCondCols); } nodesDestroyNode(pJoinOperator->pCondAfterMerge); @@ -439,12 +439,12 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations) { for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); - int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf); - SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightTagKeyBuf, keyLen); + int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightEqOnCondCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightEqOnCondKeyBuf); + SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen); if (!ppRows) { SArray* rows = taosArrayInit(4, sizeof(SRowLocation)); taosArrayPush(rows, rightRow); - tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightTagKeyBuf, keyLen, &rows, POINTER_BYTES); + tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen, &rows, POINTER_BYTES); } else { taosArrayPush(*ppRows, rightRow); } @@ -466,8 +466,8 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); SArray* pRightRows = NULL; if (useBuildTableTSRange) { - int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftTagCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftTagKeyBuf); - SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftTagKeyBuf, keyLen); + int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftEqOnCondCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftEqOnCondKeyBuf); + SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftEqOnCondKeyBuf, keyLen); if (!ppRightRows) { continue; } @@ -567,7 +567,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); - if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { + if (pJoinInfo->pEqualOnConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { mergeJoinFillBuildTable(pJoinInfo, rightRowLocations); rightUseBuildTable = true; taosArrayDestroy(rightRowLocations); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 91db527a02..c2771e0005 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -401,7 +401,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_SCALAR_FIELD(joinType); CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pOnConditions); - CLONE_NODE_FIELD(pTagEqualConditions); + CLONE_NODE_FIELD(pEqualOnConditions); COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(inputTsOrder); return TSDB_CODE_SUCCESS; @@ -588,7 +588,7 @@ static int32_t physiSysTableScanCopy(const SSystemTableScanPhysiNode* pSrc, SSys return TSDB_CODE_SUCCESS; } -static int32_t physiWindowCopy(const SWinodwPhysiNode* pSrc, SWinodwPhysiNode* pDst) { +static int32_t physiWindowCopy(const SWindowPhysiNode* pSrc, SWindowPhysiNode* pDst) { COPY_BASE_OBJECT_FIELD(node, physiNodeCopy); CLONE_NODE_LIST_FIELD(pExprs); CLONE_NODE_LIST_FIELD(pFuncs); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index b4e2d95e26..d16cd79e97 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1432,7 +1432,7 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqualConditions, nodeToJson, pNode->pTagEqualConditions); + code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqualConditions, nodeToJson, pNode->pEqualOnConditions); } return code; } @@ -1451,7 +1451,7 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) { code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOnConditions); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqualConditions, &pNode->pTagEqualConditions); + code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqualConditions, &pNode->pEqualOnConditions); } return code; } @@ -1905,7 +1905,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinPhysiPlanTagEqualConditions, nodeToJson, pNode->pTagEqualCondtions); + code = tjsonAddObject(pJson, jkJoinPhysiPlanTagEqualConditions, nodeToJson, pNode->pEqualOnCondtions); } return code; } @@ -1930,7 +1930,7 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinPhysiPlanTagEqualConditions, &pNode->pTagEqualCondtions); + code = jsonToNodeObject(pJson, jkJoinPhysiPlanTagEqualConditions, &pNode->pEqualOnCondtions); } return code; } @@ -2135,7 +2135,7 @@ static const char* jkWindowPhysiPlanOutputTsOrder = "outputTsOrder"; static const char* jkWindowPhysiPlanMergeDataBlock = "MergeDataBlock"; static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { - const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj; + const SWindowPhysiNode* pNode = (const SWindowPhysiNode*)pObj; int32_t code = physicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { @@ -2176,7 +2176,7 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { } static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) { - SWinodwPhysiNode* pNode = (SWinodwPhysiNode*)pObj; + SWindowPhysiNode* pNode = (SWindowPhysiNode*)pObj; int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 45cebb4559..59b027d5ed 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2341,7 +2341,7 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, pNode->inputTsOrder); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pTagEqualCondtions); + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pEqualOnCondtions); } return code; } @@ -2372,7 +2372,7 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) { code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder)); break; case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS: - code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagEqualCondtions); + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEqualOnCondtions); break; default: break; @@ -2639,7 +2639,7 @@ enum { }; static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { - const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj; + const SWindowPhysiNode* pNode = (const SWindowPhysiNode*)pObj; int32_t code = tlvEncodeObj(pEncoder, PHY_WINDOW_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); if (TSDB_CODE_SUCCESS == code) { @@ -2680,7 +2680,7 @@ static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { } static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) { - SWinodwPhysiNode* pNode = (SWinodwPhysiNode*)pObj; + SWindowPhysiNode* pNode = (SWindowPhysiNode*)pObj; int32_t code = TSDB_CODE_SUCCESS; STlv* pTlv = NULL; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 67747497db..81b429e169 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -599,7 +599,7 @@ static void destroyPhysiNode(SPhysiNode* pNode) { nodesDestroyNode(pNode->pSlimit); } -static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) { +static void destroyWinodwPhysiNode(SWindowPhysiNode* pNode) { destroyPhysiNode((SPhysiNode*)pNode); nodesDestroyList(pNode->pExprs); nodesDestroyList(pNode->pFuncs); @@ -1072,7 +1072,7 @@ void nodesDestroyNode(SNode* pNode) { destroyLogicNode((SLogicNode*)pLogicNode); nodesDestroyNode(pLogicNode->pMergeCondition); nodesDestroyNode(pLogicNode->pOnConditions); - nodesDestroyNode(pLogicNode->pTagEqualConditions); + nodesDestroyNode(pLogicNode->pEqualOnConditions); break; } case QUERY_NODE_LOGIC_PLAN_AGG: { @@ -1205,7 +1205,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pOnConditions); nodesDestroyList(pPhyNode->pTargets); - nodesDestroyNode(pPhyNode->pTagEqualCondtions); + nodesDestroyNode(pPhyNode->pEqualOnCondtions); break; } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { @@ -1243,7 +1243,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: - destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode); + destroyWinodwPhysiNode((SWindowPhysiNode*)pNode); break; case QUERY_NODE_PHYSICAL_PLAN_FILL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: { @@ -1259,19 +1259,19 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: - destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode); + destroyWinodwPhysiNode((SWindowPhysiNode*)pNode); break; case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: { SStateWinodwPhysiNode* pPhyNode = (SStateWinodwPhysiNode*)pNode; - destroyWinodwPhysiNode((SWinodwPhysiNode*)pPhyNode); + destroyWinodwPhysiNode((SWindowPhysiNode*)pPhyNode); nodesDestroyNode(pPhyNode->pStateKey); break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: { SEventWinodwPhysiNode* pPhyNode = (SEventWinodwPhysiNode*)pNode; - destroyWinodwPhysiNode((SWinodwPhysiNode*)pPhyNode); + destroyWinodwPhysiNode((SWindowPhysiNode*)pPhyNode); nodesDestroyNode(pPhyNode->pStartCond); nodesDestroyNode(pPhyNode->pEndCond); break; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 5be67389c8..1107389df9 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -740,18 +740,15 @@ static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoin return code; } -static bool pushDownCondOptIsTag(SNode* pNode, SNodeList* pTableCols) { +static bool pushDownCondOptIsTableColumn(SNode* pNode, SNodeList* pTableCols) { if (QUERY_NODE_COLUMN != nodeType(pNode)) { return false; } SColumnNode* pCol = (SColumnNode*)pNode; - if (COLUMN_TYPE_TAG != pCol->colType) { - return false; - } return pushDownCondOptBelongThisTable(pNode, pTableCols); } -static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { +static bool pushDownCondOptIsEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond) { if (QUERY_NODE_OPERATOR != nodeType(pCond)) { return false; } @@ -770,22 +767,22 @@ static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { } SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; - if (pushDownCondOptIsTag(pOper->pLeft, pLeftCols)) { - return pushDownCondOptIsTag(pOper->pRight, pRightCols); - } else if (pushDownCondOptIsTag(pOper->pLeft, pRightCols)) { - return pushDownCondOptIsTag(pOper->pRight, pLeftCols); + if (pushDownCondOptIsTableColumn(pOper->pLeft, pLeftCols)) { + return pushDownCondOptIsTableColumn(pOper->pRight, pRightCols); + } else if (pushDownCondOptIsTableColumn(pOper->pLeft, pRightCols)) { + return pushDownCondOptIsTableColumn(pOper->pRight, pLeftCols); } return false; } -static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin) { +static int32_t pushDownCondOptJoinExtractEqualOnLogicCond(SJoinLogicNode* pJoin) { SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions); int32_t code = TSDB_CODE_SUCCESS; SNodeList* pTagEqualConds = NULL; SNode* pCond = NULL; FOREACH(pCond, pLogicCond->pParameterList) { - if (pushDownCondOptIsTagEqualCond(pJoin, pCond)) { + if (pushDownCondOptIsEqualOnCond(pJoin, pCond)) { code = nodesListMakeAppend(&pTagEqualConds, nodesCloneNode(pCond)); } } @@ -796,7 +793,7 @@ static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin } if (TSDB_CODE_SUCCESS == code) { - pJoin->pTagEqualConditions = pTempTagEqCond; + pJoin->pEqualOnConditions = pTempTagEqCond; return TSDB_CODE_SUCCESS; } else { nodesDestroyList(pTagEqualConds); @@ -805,18 +802,18 @@ static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin return TSDB_CODE_SUCCESS; } -static int32_t pushDownCondOptJoinExtractTagEqualCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { +static int32_t pushDownCondOptJoinExtractEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (NULL == pJoin->pOnConditions) { - pJoin->pTagEqualConditions = NULL; + pJoin->pEqualOnConditions = NULL; return TSDB_CODE_SUCCESS; } if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) && LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) { - return pushDownCondOptJoinExtractTagEqualLogicCond(pJoin); + return pushDownCondOptJoinExtractEqualOnLogicCond(pJoin); } - if (pushDownCondOptIsTagEqualCond(pJoin, pJoin->pOnConditions)) { - pJoin->pTagEqualConditions = nodesCloneNode(pJoin->pOnConditions); + if (pushDownCondOptIsEqualOnCond(pJoin, pJoin->pOnConditions)) { + pJoin->pEqualOnConditions = nodesCloneNode(pJoin->pOnConditions); } return TSDB_CODE_SUCCESS; @@ -857,7 +854,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p } if (TSDB_CODE_SUCCESS == code) { - code = pushDownCondOptJoinExtractTagEqualCond(pCxt, pJoin); + code = pushDownCondOptJoinExtractEqualOnCond(pCxt, pJoin); } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index e07c6ebcfe..a13ca6cba9 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -705,8 +705,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren pJoinLogicNode->pOnConditions, &pJoin->pOnConditions); } - if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pTagEqualConditions) { - code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqualConditions, &pJoin->pTagEqualCondtions); + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pEqualOnConditions) { + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pEqualOnConditions, &pJoin->pEqualOnCondtions); } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); @@ -1150,7 +1150,7 @@ static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNo } } -static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWinodwPhysiNode* pWindow, +static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowPhysiNode* pWindow, SWindowLogicNode* pWindowLogicNode) { pWindow->triggerType = pWindowLogicNode->triggerType; pWindow->watermark = pWindowLogicNode->watermark;