From ce2635074d2b9f10d65752bdc37aadfbe2060708 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 26 Jun 2023 11:48:47 +0800 Subject: [PATCH] enh: add uid/vgid functions --- include/libs/function/functionMgt.h | 2 + include/libs/nodes/plannodes.h | 7 +- include/libs/scalar/scalar.h | 2 + source/libs/executor/src/hashjoinoperator.c | 2 +- source/libs/executor/src/scanoperator.c | 6 +- source/libs/function/src/builtins.c | 36 ++++++ source/libs/nodes/src/nodesCloneFuncs.c | 7 +- source/libs/nodes/src/nodesCodeFuncs.c | 16 +-- source/libs/nodes/src/nodesUtilFuncs.c | 6 +- source/libs/planner/src/planLogicCreater.c | 4 +- source/libs/planner/src/planOptimizer.c | 124 ++++++++++++-------- source/libs/planner/src/planPhysiCreater.c | 14 +-- source/libs/scalar/src/sclfunc.c | 25 ++++ 13 files changed, 175 insertions(+), 76 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 55af50e0bc..9264cc773d 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -122,6 +122,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_IROWTS, FUNCTION_TYPE_ISFILLED, FUNCTION_TYPE_TAGS, + FUNCTION_TYPE_TBUID, + FUNCTION_TYPE_VGID, // internal function FUNCTION_TYPE_SELECT_VALUE = 3750, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 3418485f16..88d1780230 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -110,10 +110,11 @@ typedef struct SScanLogicNode { typedef struct SJoinLogicNode { SLogicNode node; EJoinType joinType; - SNode* pMergeCondition; - SNode* pOnConditions; + SNode* pPrimKeyEqCond; + SNode* pColEqCond; + SNode* pTagEqCond; + SNode* pOtherOnCond; bool isSingleTableJoin; - SNode* pColEqualOnConditions; } SJoinLogicNode; typedef struct SAggLogicNode { diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index ef8e80b57f..2e6652f860 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -95,6 +95,8 @@ int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t qTbUidFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t qVgIdFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); /* Aggregation functions */ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index fca4a86f8d..311a2d1172 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -575,7 +575,7 @@ static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, S } } - int32_t code = getHJoinValBufSize(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow); + int32_t code = getValBufFromPages(pJoin->pRowBufs, getHJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow); if (code) { return code; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2f108c83b0..5559ff5ead 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2503,9 +2503,13 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); // refactor later - if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { + if (FUNCTION_TYPE_TBNAME == pExprInfo[j].pExpr->_function.functionType) { STR_TO_VARSTR(str, (*mr).me.name); colDataSetVal(pDst, (count), str, false); + } else if (FUNCTION_TYPE_TBUID == pExprInfo[j].pExpr->_function.functionType) { + colDataSetVal(pDst, (count), (char*)&(*mr).me.uid, false); + } else if (FUNCTION_TYPE_VGID == pExprInfo[j].pExpr->_function.functionType) { + colDataSetVal(pDst, (count), (char*)&pTaskInfo->id.vgId, false); } else { // it is a tag value STagVal val = {0}; val.cid = pExprInfo[j].base.pParam[0].pCol->colId; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 657b02c205..5a21e02998 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -679,6 +679,21 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_ return TSDB_CODE_SUCCESS; } +static int32_t translateTbUidColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + // pseudo column do not need to check parameters + pFunc->node.resType = + (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateVgIdColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + // pseudo column do not need to check parameters + pFunc->node.resType = + (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes, .type = TSDB_DATA_TYPE_INT}; + return TSDB_CODE_SUCCESS; +} + + static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); if (2 != numOfParams) { @@ -3517,6 +3532,27 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = containsProperlyFunction, .finalizeFunc = NULL }, + { + .name = "_tbuid", + .type = FUNCTION_TYPE_TBUID, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + .translateFunc = translateTbUidColumn, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = qTbUidFunction, + .finalizeFunc = NULL + }, + { + .name = "_vgid", + .type = FUNCTION_TYPE_VGID, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + .translateFunc = translateVgIdColumn, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = qVgIdFunction, + .finalizeFunc = NULL + }, + }; // clang-format on diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 6e4dde4ec1..a853d712b3 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -403,9 +403,10 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(joinType); - CLONE_NODE_FIELD(pMergeCondition); - CLONE_NODE_FIELD(pOnConditions); - CLONE_NODE_FIELD(pColEqualOnConditions); + CLONE_NODE_FIELD(pPrimKeyEqCond); + CLONE_NODE_FIELD(pColEqCond); + CLONE_NODE_FIELD(pTagEqCond); + CLONE_NODE_FIELD(pOtherOnCond); COPY_SCALAR_FIELD(isSingleTableJoin); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0b449c5bfe..8b7f708b57 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1427,8 +1427,8 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) { static const char* jkJoinLogicPlanJoinType = "JoinType"; static const char* jkJoinLogicPlanOnConditions = "OnConditions"; -static const char* jkJoinLogicPlanMergeCondition = "MergeConditions"; -static const char* jkJoinLogicPlanColEqualOnConditions = "ColumnEqualOnConditions"; +static const char* jkJoinLogicPlanPrimKeyEqCondition = "PrimKeyEqCond"; +static const char* jkJoinLogicPlanColEqCondition = "ColumnEqCond"; static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj; @@ -1438,13 +1438,13 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddIntegerToObject(pJson, jkJoinLogicPlanJoinType, pNode->joinType); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinLogicPlanMergeCondition, nodeToJson, pNode->pMergeCondition); + code = tjsonAddObject(pJson, jkJoinLogicPlanPrimKeyEqCondition, nodeToJson, pNode->pPrimKeyEqCond); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions); + code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOtherOnCond); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinLogicPlanColEqualOnConditions, nodeToJson, pNode->pColEqualOnConditions); + code = tjsonAddObject(pJson, jkJoinLogicPlanColEqCondition, nodeToJson, pNode->pColEqCond); } return code; } @@ -1457,13 +1457,13 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) { tjsonGetNumberValue(pJson, jkJoinLogicPlanJoinType, pNode->joinType, code); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinLogicPlanMergeCondition, &pNode->pMergeCondition); + code = jsonToNodeObject(pJson, jkJoinLogicPlanPrimKeyEqCondition, &pNode->pPrimKeyEqCond); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOnConditions); + code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOtherOnCond); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinLogicPlanColEqualOnConditions, &pNode->pColEqualOnConditions); + code = jsonToNodeObject(pJson, jkJoinLogicPlanColEqCondition, &pNode->pColEqCond); } return code; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 15232b95b6..f6baa734dd 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1090,9 +1090,9 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_LOGIC_PLAN_JOIN: { SJoinLogicNode* pLogicNode = (SJoinLogicNode*)pNode; destroyLogicNode((SLogicNode*)pLogicNode); - nodesDestroyNode(pLogicNode->pMergeCondition); - nodesDestroyNode(pLogicNode->pOnConditions); - nodesDestroyNode(pLogicNode->pColEqualOnConditions); + nodesDestroyNode(pLogicNode->pPrimKeyEqCond); + nodesDestroyNode(pLogicNode->pOtherOnCond); + nodesDestroyNode(pLogicNode->pColEqCond); break; } case QUERY_NODE_LOGIC_PLAN_AGG: { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 713f12e229..e56c859561 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -467,8 +467,8 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect // set on conditions if (TSDB_CODE_SUCCESS == code && NULL != pJoinTable->pOnCond) { - pJoin->pOnConditions = nodesCloneNode(pJoinTable->pOnCond); - if (NULL == pJoin->pOnConditions) { + pJoin->pOtherOnCond = nodesCloneNode(pJoinTable->pOnCond); + if (NULL == pJoin->pOtherOnCond) { code = TSDB_CODE_OUT_OF_MEMORY; } } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 2d1a758f33..25b5d21f09 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -615,7 +615,7 @@ static int32_t pushDownCondOptPartCond(SJoinLogicNode* pJoin, SNode** pOnCond, S } static int32_t pushDownCondOptPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) { - return pushDownCondOptAppendCond(&pJoin->pOnConditions, pCond); + return pushDownCondOptAppendCond(&pJoin->pOtherOnCond, pCond); } static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) { @@ -674,24 +674,24 @@ static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* } static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { - if (NULL == pJoin->pOnConditions) { + if (NULL == pJoin->pOtherOnCond) { return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN); } - if (!pushDownCondOptContainPriKeyEqualCond(pJoin, pJoin->pOnConditions)) { + if (!pushDownCondOptContainPriKeyEqualCond(pJoin, pJoin->pOtherOnCond)) { return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); } return TSDB_CODE_SUCCESS; } -static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) { - SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions); +static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNode** ppPrimKeyEqCond, SNode** ppOnCond) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOtherOnCond); int32_t code = TSDB_CODE_SUCCESS; SNodeList* pOnConds = NULL; SNode* pCond = NULL; FOREACH(pCond, pLogicCond->pParameterList) { if (pushDownCondOptIsPriKeyEqualCond(pJoin, pCond)) { - *ppMergeCond = nodesCloneNode(pCond); + *ppPrimKeyEqCond = nodesCloneNode(pCond); } else { code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond)); } @@ -702,10 +702,10 @@ static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNo code = nodesMergeConds(&pTempOnCond, &pOnConds); } - if (TSDB_CODE_SUCCESS == code && NULL != *ppMergeCond) { + if (TSDB_CODE_SUCCESS == code && NULL != *ppPrimKeyEqCond) { *ppOnCond = pTempOnCond; - nodesDestroyNode(pJoin->pOnConditions); - pJoin->pOnConditions = NULL; + nodesDestroyNode(pJoin->pOtherOnCond); + pJoin->pOtherOnCond = NULL; return TSDB_CODE_SUCCESS; } else { nodesDestroyList(pOnConds); @@ -714,35 +714,35 @@ static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNo } } -static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) { - if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) && - LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) { - return pushDownCondOptPartJoinOnCondLogicCond(pJoin, ppMergeCond, ppOnCond); +static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppPrimKeyEqCond, SNode** ppOnCond) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOtherOnCond) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOtherOnCond))->condType) { + return pushDownCondOptPartJoinOnCondLogicCond(pJoin, ppPrimKeyEqCond, ppOnCond); } - if (pushDownCondOptIsPriKeyEqualCond(pJoin, pJoin->pOnConditions)) { - *ppMergeCond = nodesCloneNode(pJoin->pOnConditions); + if (pushDownCondOptIsPriKeyEqualCond(pJoin, pJoin->pOtherOnCond)) { + *ppPrimKeyEqCond = nodesCloneNode(pJoin->pOtherOnCond); *ppOnCond = NULL; - nodesDestroyNode(pJoin->pOnConditions); - pJoin->pOnConditions = NULL; + nodesDestroyNode(pJoin->pOtherOnCond); + pJoin->pOtherOnCond = NULL; return TSDB_CODE_SUCCESS; } else { return TSDB_CODE_PLAN_INTERNAL_ERROR; } } -static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { +static int32_t pushDownCondOptJoinExtractCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { int32_t code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin); - SNode* pJoinMergeCond = NULL; + SNode* pPrimKeyEqCond = NULL; SNode* pJoinOnCond = NULL; if (TSDB_CODE_SUCCESS == code) { - code = pushDownCondOptPartJoinOnCond(pJoin, &pJoinMergeCond, &pJoinOnCond); + code = pushDownCondOptPartJoinOnCond(pJoin, &pPrimKeyEqCond, &pJoinOnCond); } if (TSDB_CODE_SUCCESS == code) { - pJoin->pMergeCondition = pJoinMergeCond; - pJoin->pOnConditions = pJoinOnCond; + pJoin->pPrimKeyEqCond = pPrimKeyEqCond; + pJoin->pOtherOnCond = pJoinOnCond; } else { - nodesDestroyNode(pJoinMergeCond); + nodesDestroyNode(pPrimKeyEqCond); nodesDestroyNode(pJoinOnCond); } return code; @@ -756,7 +756,7 @@ static bool pushDownCondOptIsTableColumn(SNode* pNode, SNodeList* pTableCols) { return pushDownCondOptBelongThisTable(pNode, pTableCols); } -static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond) { +static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond, bool* allTags) { if (QUERY_NODE_OPERATOR != nodeType(pCond)) { return false; } @@ -775,53 +775,75 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond) } SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; + bool isEqual = false; if (pushDownCondOptIsTableColumn(pOper->pLeft, pLeftCols)) { - return pushDownCondOptIsTableColumn(pOper->pRight, pRightCols); + isEqual = pushDownCondOptIsTableColumn(pOper->pRight, pRightCols); } else if (pushDownCondOptIsTableColumn(pOper->pLeft, pRightCols)) { - return pushDownCondOptIsTableColumn(pOper->pRight, pLeftCols); + isEqual = pushDownCondOptIsTableColumn(pOper->pRight, pLeftCols); } - return false; + if (isEqual) { + *allTags = (COLUMN_TYPE_TAG == pLeft->colType) && (COLUMN_TYPE_TAG == pRight->colType); + } + return isEqual; } -static int32_t pushDownCondOptJoinExtractColEqualOnLogicCond(SJoinLogicNode* pJoin) { - SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions); +static int32_t pushDownCondOptJoinExtractEqualOnLogicCond(SJoinLogicNode* pJoin) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOtherOnCond); int32_t code = TSDB_CODE_SUCCESS; - SNodeList* pEqualOnConds = NULL; + SNodeList* pColEqOnConds = NULL; + SNodeList* pTagEqOnConds = NULL; SNode* pCond = NULL; + bool allTags = false; FOREACH(pCond, pLogicCond->pParameterList) { - if (pushDownCondOptIsColEqualOnCond(pJoin, pCond)) { - code = nodesListMakeAppend(&pEqualOnConds, nodesCloneNode(pCond)); + if (pushDownCondOptIsColEqualOnCond(pJoin, pCond, &allTags)) { + if (allTags) { + code = nodesListMakeAppend(&pTagEqOnConds, nodesCloneNode(pCond)); + } else { + code = nodesListMakeAppend(&pColEqOnConds, nodesCloneNode(pCond)); + } } } SNode* pTempTagEqCond = NULL; + SNode* pTempColEqCond = NULL; if (TSDB_CODE_SUCCESS == code) { - code = nodesMergeConds(&pTempTagEqCond, &pEqualOnConds); + code = nodesMergeConds(&pTempColEqCond, &pColEqOnConds); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempTagEqCond, &pTagEqOnConds); } if (TSDB_CODE_SUCCESS == code) { - pJoin->pColEqualOnConditions = pTempTagEqCond; + pJoin->pColEqCond = pTempColEqCond; + pJoin->pTagEqCond = pTempTagEqCond; return TSDB_CODE_SUCCESS; } else { - nodesDestroyList(pEqualOnConds); + nodesDestroyList(pColEqOnConds); + nodesDestroyList(pTagEqOnConds); return TSDB_CODE_PLAN_INTERNAL_ERROR; } return TSDB_CODE_SUCCESS; } -static int32_t pushDownCondOptJoinExtractColEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { - if (NULL == pJoin->pOnConditions) { - pJoin->pColEqualOnConditions = NULL; +static int32_t pushDownCondOptJoinExtractEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (NULL == pJoin->pOtherOnCond) { + pJoin->pColEqCond = NULL; + pJoin->pTagEqCond = NULL; return TSDB_CODE_SUCCESS; } - if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) && - LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) { - return pushDownCondOptJoinExtractColEqualOnLogicCond(pJoin); + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOtherOnCond) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOtherOnCond))->condType) { + return pushDownCondOptJoinExtractEqualOnLogicCond(pJoin); } - if (pushDownCondOptIsColEqualOnCond(pJoin, pJoin->pOnConditions)) { - pJoin->pColEqualOnConditions = nodesCloneNode(pJoin->pOnConditions); + bool allTags = false; + if (pushDownCondOptIsColEqualOnCond(pJoin, pJoin->pOtherOnCond, &allTags)) { + if (allTags) { + pJoin->pTagEqCond = nodesCloneNode(pJoin->pOtherOnCond); + } else { + pJoin->pColEqCond = nodesCloneNode(pJoin->pOtherOnCond); + } } return TSDB_CODE_SUCCESS; @@ -833,7 +855,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p } if (NULL == pJoin->node.pConditions) { - int32_t code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin); + int32_t code = pushDownCondOptJoinExtractCond(pCxt, pJoin); if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); pCxt->optimized = true; @@ -858,11 +880,11 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p } if (TSDB_CODE_SUCCESS == code) { - code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin); + code = pushDownCondOptJoinExtractCond(pCxt, pJoin); } if (TSDB_CODE_SUCCESS == code) { - code = pushDownCondOptJoinExtractColEqualOnCond(pCxt, pJoin); + code = pushDownCondOptJoinExtractEqualOnCond(pCxt, pJoin); } if (TSDB_CODE_SUCCESS == code) { @@ -1792,10 +1814,16 @@ static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild, nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); if (!cxt.canUse) return false; } - if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && NULL != ((SJoinLogicNode*)pChild)->pOnConditions) { + if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) { SJoinLogicNode* pJoinLogicNode = (SJoinLogicNode*)pChild; CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = false}; - nodesWalkExpr(pJoinLogicNode->pOnConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); + nodesWalkExpr(pJoinLogicNode->pPrimKeyEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); + if (!cxt.canUse) return false; + nodesWalkExpr(pJoinLogicNode->pColEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); + if (!cxt.canUse) return false; + nodesWalkExpr(pJoinLogicNode->pTagEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); + if (!cxt.canUse) return false; + nodesWalkExpr(pJoinLogicNode->pOtherOnCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); if (!cxt.canUse) return false; } return true; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index b3d94a5e47..653072ace1 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -679,7 +679,7 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren pJoin->joinType = pJoinLogicNode->joinType; pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; - setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition, + setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pMergeCondition); if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, @@ -689,12 +689,12 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); } - if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) { + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { SNodeList* pCondCols = nodesMakeList(); if (NULL == pCondCols) { code = TSDB_CODE_OUT_OF_MEMORY; } else { - code = nodesCollectColumnsFromNode(pJoinLogicNode->pOnConditions, NULL, COLLECT_COL_TYPE_ALL, &pCondCols); + code = nodesCollectColumnsFromNode(pJoinLogicNode->pOtherOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols); } if (TSDB_CODE_SUCCESS == code) { code = addDataBlockSlots(pCxt, pCondCols, pJoin->node.pOutputDataBlockDesc); @@ -702,13 +702,13 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren nodesDestroyList(pCondCols); } - if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) { + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, - pJoinLogicNode->pOnConditions, &pJoin->pOnConditions); + pJoinLogicNode->pOtherOnCond, &pJoin->pOnConditions); } - if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqualOnConditions) { - code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqualOnConditions, &pJoin->pColEqualOnConditions); + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) { + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqualOnConditions); } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index b9af716929..821967dfa4 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1706,6 +1706,31 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO return TSDB_CODE_SUCCESS; } +int32_t qTbUidFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + char* p = colDataGetNumData(pInput->columnData, 0); + + int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true); + if (code) { + return code; + } + + pOutput->numOfRows += pInput->numOfRows; + return TSDB_CODE_SUCCESS; +} + +int32_t qVgIdFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + char* p = colDataGetNumData(pInput->columnData, 0); + + int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true); + if (code) { + return code; + } + + pOutput->numOfRows += pInput->numOfRows; + return TSDB_CODE_SUCCESS; +} + + /** Aggregation functions **/ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { SColumnInfoData *pInputData = pInput->columnData;