enh: add uid/vgid functions
This commit is contained in:
parent
9972aa81d6
commit
ce2635074d
|
@ -122,6 +122,8 @@ typedef enum EFunctionType {
|
|||
FUNCTION_TYPE_IROWTS,
|
||||
FUNCTION_TYPE_ISFILLED,
|
||||
FUNCTION_TYPE_TAGS,
|
||||
FUNCTION_TYPE_TBUID,
|
||||
FUNCTION_TYPE_VGID,
|
||||
|
||||
// internal function
|
||||
FUNCTION_TYPE_SELECT_VALUE = 3750,
|
||||
|
|
|
@ -110,10 +110,11 @@ typedef struct SScanLogicNode {
|
|||
typedef struct SJoinLogicNode {
|
||||
SLogicNode node;
|
||||
EJoinType joinType;
|
||||
SNode* pMergeCondition;
|
||||
SNode* pOnConditions;
|
||||
SNode* pPrimKeyEqCond;
|
||||
SNode* pColEqCond;
|
||||
SNode* pTagEqCond;
|
||||
SNode* pOtherOnCond;
|
||||
bool isSingleTableJoin;
|
||||
SNode* pColEqualOnConditions;
|
||||
} SJoinLogicNode;
|
||||
|
||||
typedef struct SAggLogicNode {
|
||||
|
|
|
@ -95,6 +95,8 @@ int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
|||
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
|
||||
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t qTbUidFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t qVgIdFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
|
||||
/* Aggregation functions */
|
||||
int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
|
|
|
@ -575,7 +575,7 @@ static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, S
|
|||
}
|
||||
}
|
||||
|
||||
int32_t code = getHJoinValBufSize(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow);
|
||||
int32_t code = getValBufFromPages(pJoin->pRowBufs, getHJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2503,9 +2503,13 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
|
|||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||
|
||||
// refactor later
|
||||
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
|
||||
if (FUNCTION_TYPE_TBNAME == pExprInfo[j].pExpr->_function.functionType) {
|
||||
STR_TO_VARSTR(str, (*mr).me.name);
|
||||
colDataSetVal(pDst, (count), str, false);
|
||||
} else if (FUNCTION_TYPE_TBUID == pExprInfo[j].pExpr->_function.functionType) {
|
||||
colDataSetVal(pDst, (count), (char*)&(*mr).me.uid, false);
|
||||
} else if (FUNCTION_TYPE_VGID == pExprInfo[j].pExpr->_function.functionType) {
|
||||
colDataSetVal(pDst, (count), (char*)&pTaskInfo->id.vgId, false);
|
||||
} else { // it is a tag value
|
||||
STagVal val = {0};
|
||||
val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
|
||||
|
|
|
@ -679,6 +679,21 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateTbUidColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
// pseudo column do not need to check parameters
|
||||
pFunc->node.resType =
|
||||
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateVgIdColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
// pseudo column do not need to check parameters
|
||||
pFunc->node.resType =
|
||||
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes, .type = TSDB_DATA_TYPE_INT};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
if (2 != numOfParams) {
|
||||
|
@ -3517,6 +3532,27 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.sprocessFunc = containsProperlyFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "_tbuid",
|
||||
.type = FUNCTION_TYPE_TBUID,
|
||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
|
||||
.translateFunc = translateTbUidColumn,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = qTbUidFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "_vgid",
|
||||
.type = FUNCTION_TYPE_VGID,
|
||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
|
||||
.translateFunc = translateVgIdColumn,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = qVgIdFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -403,9 +403,10 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
|||
static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
|
||||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||
COPY_SCALAR_FIELD(joinType);
|
||||
CLONE_NODE_FIELD(pMergeCondition);
|
||||
CLONE_NODE_FIELD(pOnConditions);
|
||||
CLONE_NODE_FIELD(pColEqualOnConditions);
|
||||
CLONE_NODE_FIELD(pPrimKeyEqCond);
|
||||
CLONE_NODE_FIELD(pColEqCond);
|
||||
CLONE_NODE_FIELD(pTagEqCond);
|
||||
CLONE_NODE_FIELD(pOtherOnCond);
|
||||
COPY_SCALAR_FIELD(isSingleTableJoin);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -1427,8 +1427,8 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) {
|
|||
|
||||
static const char* jkJoinLogicPlanJoinType = "JoinType";
|
||||
static const char* jkJoinLogicPlanOnConditions = "OnConditions";
|
||||
static const char* jkJoinLogicPlanMergeCondition = "MergeConditions";
|
||||
static const char* jkJoinLogicPlanColEqualOnConditions = "ColumnEqualOnConditions";
|
||||
static const char* jkJoinLogicPlanPrimKeyEqCondition = "PrimKeyEqCond";
|
||||
static const char* jkJoinLogicPlanColEqCondition = "ColumnEqCond";
|
||||
|
||||
static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj;
|
||||
|
@ -1438,13 +1438,13 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
|
|||
code = tjsonAddIntegerToObject(pJson, jkJoinLogicPlanJoinType, pNode->joinType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkJoinLogicPlanMergeCondition, nodeToJson, pNode->pMergeCondition);
|
||||
code = tjsonAddObject(pJson, jkJoinLogicPlanPrimKeyEqCondition, nodeToJson, pNode->pPrimKeyEqCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions);
|
||||
code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOtherOnCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkJoinLogicPlanColEqualOnConditions, nodeToJson, pNode->pColEqualOnConditions);
|
||||
code = tjsonAddObject(pJson, jkJoinLogicPlanColEqCondition, nodeToJson, pNode->pColEqCond);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1457,13 +1457,13 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) {
|
|||
tjsonGetNumberValue(pJson, jkJoinLogicPlanJoinType, pNode->joinType, code);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkJoinLogicPlanMergeCondition, &pNode->pMergeCondition);
|
||||
code = jsonToNodeObject(pJson, jkJoinLogicPlanPrimKeyEqCondition, &pNode->pPrimKeyEqCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOnConditions);
|
||||
code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOtherOnCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkJoinLogicPlanColEqualOnConditions, &pNode->pColEqualOnConditions);
|
||||
code = jsonToNodeObject(pJson, jkJoinLogicPlanColEqCondition, &pNode->pColEqCond);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -1090,9 +1090,9 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
case QUERY_NODE_LOGIC_PLAN_JOIN: {
|
||||
SJoinLogicNode* pLogicNode = (SJoinLogicNode*)pNode;
|
||||
destroyLogicNode((SLogicNode*)pLogicNode);
|
||||
nodesDestroyNode(pLogicNode->pMergeCondition);
|
||||
nodesDestroyNode(pLogicNode->pOnConditions);
|
||||
nodesDestroyNode(pLogicNode->pColEqualOnConditions);
|
||||
nodesDestroyNode(pLogicNode->pPrimKeyEqCond);
|
||||
nodesDestroyNode(pLogicNode->pOtherOnCond);
|
||||
nodesDestroyNode(pLogicNode->pColEqCond);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG: {
|
||||
|
|
|
@ -467,8 +467,8 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
|
||||
// set on conditions
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinTable->pOnCond) {
|
||||
pJoin->pOnConditions = nodesCloneNode(pJoinTable->pOnCond);
|
||||
if (NULL == pJoin->pOnConditions) {
|
||||
pJoin->pOtherOnCond = nodesCloneNode(pJoinTable->pOnCond);
|
||||
if (NULL == pJoin->pOtherOnCond) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -615,7 +615,7 @@ static int32_t pushDownCondOptPartCond(SJoinLogicNode* pJoin, SNode** pOnCond, S
|
|||
}
|
||||
|
||||
static int32_t pushDownCondOptPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) {
|
||||
return pushDownCondOptAppendCond(&pJoin->pOnConditions, pCond);
|
||||
return pushDownCondOptAppendCond(&pJoin->pOtherOnCond, pCond);
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) {
|
||||
|
@ -674,24 +674,24 @@ static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode*
|
|||
}
|
||||
|
||||
static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||
if (NULL == pJoin->pOnConditions) {
|
||||
if (NULL == pJoin->pOtherOnCond) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN);
|
||||
}
|
||||
if (!pushDownCondOptContainPriKeyEqualCond(pJoin, pJoin->pOnConditions)) {
|
||||
if (!pushDownCondOptContainPriKeyEqualCond(pJoin, pJoin->pOtherOnCond)) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) {
|
||||
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions);
|
||||
static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNode** ppPrimKeyEqCond, SNode** ppOnCond) {
|
||||
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOtherOnCond);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNodeList* pOnConds = NULL;
|
||||
SNode* pCond = NULL;
|
||||
FOREACH(pCond, pLogicCond->pParameterList) {
|
||||
if (pushDownCondOptIsPriKeyEqualCond(pJoin, pCond)) {
|
||||
*ppMergeCond = nodesCloneNode(pCond);
|
||||
*ppPrimKeyEqCond = nodesCloneNode(pCond);
|
||||
} else {
|
||||
code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond));
|
||||
}
|
||||
|
@ -702,10 +702,10 @@ static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNo
|
|||
code = nodesMergeConds(&pTempOnCond, &pOnConds);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != *ppMergeCond) {
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != *ppPrimKeyEqCond) {
|
||||
*ppOnCond = pTempOnCond;
|
||||
nodesDestroyNode(pJoin->pOnConditions);
|
||||
pJoin->pOnConditions = NULL;
|
||||
nodesDestroyNode(pJoin->pOtherOnCond);
|
||||
pJoin->pOtherOnCond = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
nodesDestroyList(pOnConds);
|
||||
|
@ -714,35 +714,35 @@ static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNo
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) {
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) &&
|
||||
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) {
|
||||
return pushDownCondOptPartJoinOnCondLogicCond(pJoin, ppMergeCond, ppOnCond);
|
||||
static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppPrimKeyEqCond, SNode** ppOnCond) {
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOtherOnCond) &&
|
||||
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOtherOnCond))->condType) {
|
||||
return pushDownCondOptPartJoinOnCondLogicCond(pJoin, ppPrimKeyEqCond, ppOnCond);
|
||||
}
|
||||
|
||||
if (pushDownCondOptIsPriKeyEqualCond(pJoin, pJoin->pOnConditions)) {
|
||||
*ppMergeCond = nodesCloneNode(pJoin->pOnConditions);
|
||||
if (pushDownCondOptIsPriKeyEqualCond(pJoin, pJoin->pOtherOnCond)) {
|
||||
*ppPrimKeyEqCond = nodesCloneNode(pJoin->pOtherOnCond);
|
||||
*ppOnCond = NULL;
|
||||
nodesDestroyNode(pJoin->pOnConditions);
|
||||
pJoin->pOnConditions = NULL;
|
||||
nodesDestroyNode(pJoin->pOtherOnCond);
|
||||
pJoin->pOtherOnCond = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||
static int32_t pushDownCondOptJoinExtractCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||
int32_t code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin);
|
||||
SNode* pJoinMergeCond = NULL;
|
||||
SNode* pPrimKeyEqCond = NULL;
|
||||
SNode* pJoinOnCond = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = pushDownCondOptPartJoinOnCond(pJoin, &pJoinMergeCond, &pJoinOnCond);
|
||||
code = pushDownCondOptPartJoinOnCond(pJoin, &pPrimKeyEqCond, &pJoinOnCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pJoin->pMergeCondition = pJoinMergeCond;
|
||||
pJoin->pOnConditions = pJoinOnCond;
|
||||
pJoin->pPrimKeyEqCond = pPrimKeyEqCond;
|
||||
pJoin->pOtherOnCond = pJoinOnCond;
|
||||
} else {
|
||||
nodesDestroyNode(pJoinMergeCond);
|
||||
nodesDestroyNode(pPrimKeyEqCond);
|
||||
nodesDestroyNode(pJoinOnCond);
|
||||
}
|
||||
return code;
|
||||
|
@ -756,7 +756,7 @@ static bool pushDownCondOptIsTableColumn(SNode* pNode, SNodeList* pTableCols) {
|
|||
return pushDownCondOptBelongThisTable(pNode, pTableCols);
|
||||
}
|
||||
|
||||
static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond) {
|
||||
static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond, bool* allTags) {
|
||||
if (QUERY_NODE_OPERATOR != nodeType(pCond)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -775,53 +775,75 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond)
|
|||
}
|
||||
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
|
||||
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
|
||||
bool isEqual = false;
|
||||
if (pushDownCondOptIsTableColumn(pOper->pLeft, pLeftCols)) {
|
||||
return pushDownCondOptIsTableColumn(pOper->pRight, pRightCols);
|
||||
isEqual = pushDownCondOptIsTableColumn(pOper->pRight, pRightCols);
|
||||
} else if (pushDownCondOptIsTableColumn(pOper->pLeft, pRightCols)) {
|
||||
return pushDownCondOptIsTableColumn(pOper->pRight, pLeftCols);
|
||||
isEqual = pushDownCondOptIsTableColumn(pOper->pRight, pLeftCols);
|
||||
}
|
||||
return false;
|
||||
if (isEqual) {
|
||||
*allTags = (COLUMN_TYPE_TAG == pLeft->colType) && (COLUMN_TYPE_TAG == pRight->colType);
|
||||
}
|
||||
return isEqual;
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptJoinExtractColEqualOnLogicCond(SJoinLogicNode* pJoin) {
|
||||
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions);
|
||||
static int32_t pushDownCondOptJoinExtractEqualOnLogicCond(SJoinLogicNode* pJoin) {
|
||||
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOtherOnCond);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNodeList* pEqualOnConds = NULL;
|
||||
SNodeList* pColEqOnConds = NULL;
|
||||
SNodeList* pTagEqOnConds = NULL;
|
||||
SNode* pCond = NULL;
|
||||
bool allTags = false;
|
||||
FOREACH(pCond, pLogicCond->pParameterList) {
|
||||
if (pushDownCondOptIsColEqualOnCond(pJoin, pCond)) {
|
||||
code = nodesListMakeAppend(&pEqualOnConds, nodesCloneNode(pCond));
|
||||
if (pushDownCondOptIsColEqualOnCond(pJoin, pCond, &allTags)) {
|
||||
if (allTags) {
|
||||
code = nodesListMakeAppend(&pTagEqOnConds, nodesCloneNode(pCond));
|
||||
} else {
|
||||
code = nodesListMakeAppend(&pColEqOnConds, nodesCloneNode(pCond));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SNode* pTempTagEqCond = NULL;
|
||||
SNode* pTempColEqCond = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesMergeConds(&pTempTagEqCond, &pEqualOnConds);
|
||||
code = nodesMergeConds(&pTempColEqCond, &pColEqOnConds);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesMergeConds(&pTempTagEqCond, &pTagEqOnConds);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pJoin->pColEqualOnConditions = pTempTagEqCond;
|
||||
pJoin->pColEqCond = pTempColEqCond;
|
||||
pJoin->pTagEqCond = pTempTagEqCond;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
nodesDestroyList(pEqualOnConds);
|
||||
nodesDestroyList(pColEqOnConds);
|
||||
nodesDestroyList(pTagEqOnConds);
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptJoinExtractColEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||
if (NULL == pJoin->pOnConditions) {
|
||||
pJoin->pColEqualOnConditions = NULL;
|
||||
static int32_t pushDownCondOptJoinExtractEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||
if (NULL == pJoin->pOtherOnCond) {
|
||||
pJoin->pColEqCond = NULL;
|
||||
pJoin->pTagEqCond = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) &&
|
||||
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) {
|
||||
return pushDownCondOptJoinExtractColEqualOnLogicCond(pJoin);
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOtherOnCond) &&
|
||||
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOtherOnCond))->condType) {
|
||||
return pushDownCondOptJoinExtractEqualOnLogicCond(pJoin);
|
||||
}
|
||||
|
||||
if (pushDownCondOptIsColEqualOnCond(pJoin, pJoin->pOnConditions)) {
|
||||
pJoin->pColEqualOnConditions = nodesCloneNode(pJoin->pOnConditions);
|
||||
bool allTags = false;
|
||||
if (pushDownCondOptIsColEqualOnCond(pJoin, pJoin->pOtherOnCond, &allTags)) {
|
||||
if (allTags) {
|
||||
pJoin->pTagEqCond = nodesCloneNode(pJoin->pOtherOnCond);
|
||||
} else {
|
||||
pJoin->pColEqCond = nodesCloneNode(pJoin->pOtherOnCond);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -833,7 +855,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
|
|||
}
|
||||
|
||||
if (NULL == pJoin->node.pConditions) {
|
||||
int32_t code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin);
|
||||
int32_t code = pushDownCondOptJoinExtractCond(pCxt, pJoin);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
|
||||
pCxt->optimized = true;
|
||||
|
@ -858,11 +880,11 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
|
|||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin);
|
||||
code = pushDownCondOptJoinExtractCond(pCxt, pJoin);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = pushDownCondOptJoinExtractColEqualOnCond(pCxt, pJoin);
|
||||
code = pushDownCondOptJoinExtractEqualOnCond(pCxt, pJoin);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -1792,10 +1814,16 @@ static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild,
|
|||
nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
|
||||
if (!cxt.canUse) return false;
|
||||
}
|
||||
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && NULL != ((SJoinLogicNode*)pChild)->pOnConditions) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
|
||||
SJoinLogicNode* pJoinLogicNode = (SJoinLogicNode*)pChild;
|
||||
CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = false};
|
||||
nodesWalkExpr(pJoinLogicNode->pOnConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
|
||||
nodesWalkExpr(pJoinLogicNode->pPrimKeyEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
|
||||
if (!cxt.canUse) return false;
|
||||
nodesWalkExpr(pJoinLogicNode->pColEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
|
||||
if (!cxt.canUse) return false;
|
||||
nodesWalkExpr(pJoinLogicNode->pTagEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
|
||||
if (!cxt.canUse) return false;
|
||||
nodesWalkExpr(pJoinLogicNode->pOtherOnCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
|
||||
if (!cxt.canUse) return false;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -679,7 +679,7 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
|||
|
||||
pJoin->joinType = pJoinLogicNode->joinType;
|
||||
pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
|
||||
setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition,
|
||||
setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
|
||||
&pJoin->pMergeCondition);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
|
||||
|
@ -689,12 +689,12 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
|||
code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) {
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) {
|
||||
SNodeList* pCondCols = nodesMakeList();
|
||||
if (NULL == pCondCols) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
code = nodesCollectColumnsFromNode(pJoinLogicNode->pOnConditions, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
|
||||
code = nodesCollectColumnsFromNode(pJoinLogicNode->pOtherOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addDataBlockSlots(pCxt, pCondCols, pJoin->node.pOutputDataBlockDesc);
|
||||
|
@ -702,13 +702,13 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
|||
nodesDestroyList(pCondCols);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) {
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) {
|
||||
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
|
||||
pJoinLogicNode->pOnConditions, &pJoin->pOnConditions);
|
||||
pJoinLogicNode->pOtherOnCond, &pJoin->pOnConditions);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqualOnConditions) {
|
||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqualOnConditions, &pJoin->pColEqualOnConditions);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
|
||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqualOnConditions);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
|
||||
|
|
|
@ -1706,6 +1706,31 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qTbUidFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
char* p = colDataGetNumData(pInput->columnData, 0);
|
||||
|
||||
int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pOutput->numOfRows += pInput->numOfRows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qVgIdFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
char* p = colDataGetNumData(pInput->columnData, 0);
|
||||
|
||||
int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pOutput->numOfRows += pInput->numOfRows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/** Aggregation functions **/
|
||||
int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
SColumnInfoData *pInputData = pInput->columnData;
|
||||
|
|
Loading…
Reference in New Issue