From e41cd44c51e76c4ac8ef811a1cbbb2622ccf3b2e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 26 Feb 2025 16:30:36 +0800 Subject: [PATCH] enh: add merge join primary expr type --- include/libs/nodes/querynodes.h | 2 + source/libs/executor/inc/mergejoin.h | 17 +++- source/libs/executor/src/mergejoinoperator.c | 92 ++++++++++++++------ source/libs/nodes/src/nodesCloneFuncs.c | 9 +- source/libs/parser/inc/parInt.h | 4 +- source/libs/parser/src/parCalcConst.c | 5 ++ source/libs/parser/src/parTranslater.c | 20 +++-- 7 files changed, 103 insertions(+), 46 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 3c15ffa6b4..ff0ab597ff 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -61,6 +61,7 @@ typedef struct SExprNode { bool asAlias; bool asParam; bool asPosition; + bool joinSrc; int32_t projIdx; int32_t relatedTo; int32_t bindExprID; @@ -209,6 +210,7 @@ typedef struct STableNode { char tableAlias[TSDB_TABLE_NAME_LEN]; uint8_t precision; bool singleTable; + bool inJoin; } STableNode; struct STableMeta; diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index ceb0037b8d..a67e31760c 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -40,6 +40,11 @@ typedef enum EJoinTableType { E_JOIN_TB_PROBE } EJoinTableType; +typedef enum EPrimExprType { + E_PRIM_TIMETRUNCATE = 1, + E_PRIM_VALUE +} EPrimExprType; + #define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE") #define IS_FULL_OUTER_JOIN(_jtype, _stype) ((_jtype) == JOIN_TYPE_FULL && (_stype) == JOIN_STYPE_OUTER) @@ -87,9 +92,15 @@ typedef struct SMJoinNMatchCtx { // for now timetruncate only typedef struct SMJoinPrimExprCtx { - int64_t truncateUnit; - int64_t timezoneUnit; - int32_t targetSlotId; + EPrimExprType type; + + // FOR TIMETRUNCATE + int64_t truncateUnit; + int64_t timezoneUnit; + int32_t targetSlotId; + + // FOR VALUE + int64_t constTs; } SMJoinPrimExprCtx; typedef struct SMJoinTableCtx { diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 3edef48ed1..d7f7f0d085 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -894,21 +894,7 @@ static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) { return TSDB_CODE_SUCCESS; } -static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoinTableCtx* pTable) { - if (NULL == pNode) { - pCtx->targetSlotId = pTable->primCol->srcSlot; - return TSDB_CODE_SUCCESS; - } - - if (QUERY_NODE_TARGET != nodeType(pNode)) { - return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; - } - - STargetNode* pTarget = (STargetNode*)pNode; - if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr)) { - return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; - } - +static int32_t mJoinInitFuncPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) { SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr; if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) { return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; @@ -939,6 +925,47 @@ static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoi pCtx->timezoneUnit = offsetFromTz(varDataVal(pTimeZone->datum.p), TSDB_TICK_PER_SECOND(pFunc->node.resType.precision)); } + pCtx->type = E_PRIM_TIMETRUNCATE; + + return TSDB_CODE_SUCCESS; +} + +static int32_t mJoinInitValPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) { + SValueNode* pVal = (SValueNode*)pTarget->pExpr; + if (TSDB_DATA_TYPE_TIMESTAMP != pVal->node.resType.type) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + pCtx->constTs = pVal->datum.i; + pCtx->type = E_PRIM_VALUE; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoinTableCtx* pTable) { + if (NULL == pNode) { + pCtx->targetSlotId = pTable->primCol->srcSlot; + return TSDB_CODE_SUCCESS; + } + + if (QUERY_NODE_TARGET != nodeType(pNode)) { + qError("primary expr node is not target, type:%d", nodeType(pNode)); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + STargetNode* pTarget = (STargetNode*)pNode; + if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr) && QUERY_NODE_VALUE != nodeType(pTarget->pExpr)) { + qError("Invalid primary expr node type:%d", nodeType(pTarget->pExpr)); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + if (QUERY_NODE_FUNCTION == nodeType(pTarget->pExpr)) { + MJ_ERR_RET(mJoinInitFuncPrimExprCtx(pCtx, pTarget)); + } else if (QUERY_NODE_VALUE == nodeType(pTarget->pExpr)) { + MJ_ERR_RET(mJoinInitValPrimExprCtx(pCtx, pTarget)); + } + pCtx->targetSlotId = pTarget->slotId; return TSDB_CODE_SUCCESS; @@ -1045,25 +1072,36 @@ int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) { return TSDB_CODE_SUCCESS; } - SMJoinPrimExprCtx* pCtx = &pTable->primCtx; - SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot); - if (NULL == pPrimIn) { - return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; - } - SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId); if (NULL == pPrimOut) { return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } - if (0 != pCtx->timezoneUnit) { - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit; + SMJoinPrimExprCtx* pCtx = &pTable->primCtx; + switch (pCtx->type) { + case E_PRIM_TIMETRUNCATE: { + SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot); + if (NULL == pPrimIn) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + if (0 != pCtx->timezoneUnit) { + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit; + } + } else { + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit; + } + } + break; } - } else { - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit; + case E_PRIM_VALUE: { + MJ_ERR_RET(colDataSetNItems(pPrimOut, 0, (char*)&pCtx->constTs, pBlock->info.rows, false)); + break; } + default: + break; } return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index ff796df0f6..313ab854bc 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -105,6 +105,7 @@ static int32_t exprNodeCopy(const SExprNode* pSrc, SExprNode* pDst) { COPY_SCALAR_FIELD(asAlias); COPY_SCALAR_FIELD(asParam); COPY_SCALAR_FIELD(asPosition); + COPY_SCALAR_FIELD(joinSrc); COPY_SCALAR_FIELD(projIdx); COPY_SCALAR_FIELD(relatedTo); COPY_SCALAR_FIELD(bindExprID); @@ -242,6 +243,7 @@ static int32_t tableNodeCopy(const STableNode* pSrc, STableNode* pDst) { COPY_CHAR_ARRAY_FIELD(tableAlias); COPY_SCALAR_FIELD(precision); COPY_SCALAR_FIELD(singleTable); + COPY_SCALAR_FIELD(inJoin); return TSDB_CODE_SUCCESS; } @@ -321,8 +323,6 @@ static int32_t joinTableNodeCopy(const SJoinTableNode* pSrc, SJoinTableNode* pDs CLONE_NODE_FIELD(addPrimCond); COPY_SCALAR_FIELD(hasSubQuery); COPY_SCALAR_FIELD(isLowLevelJoin); - COPY_SCALAR_FIELD(leftNoOrderQuery); - COPY_SCALAR_FIELD(rightNoOrderQuery); CLONE_NODE_FIELD(pLeft); CLONE_NODE_FIELD(pRight); CLONE_NODE_FIELD(pOnCond); @@ -544,11 +544,8 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_SCALAR_FIELD(grpJoin); COPY_SCALAR_FIELD(hashJoinHint); COPY_SCALAR_FIELD(batchScanHint); - COPY_SCALAR_FIELD(leftNoOrderQuery); - COPY_SCALAR_FIELD(rightNoOrderQuery); COPY_SCALAR_FIELD(noPrimKeyEqCond); - CLONE_NODE_FIELD(pLeftConstPrim); - CLONE_NODE_FIELD(pRightConstPrim); + COPY_SCALAR_FIELD(constPrimGot); CLONE_NODE_FIELD(pLeftOnCond); CLONE_NODE_FIELD(pRightOnCond); COPY_SCALAR_FIELD(timeRangeTarget); diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index 5999ada70f..4b5d106318 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -42,12 +42,12 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, SSDa int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock); int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock); int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placeholderNo, SArray** pPlaceholderValues); -int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinParent); +int32_t translateTable(STranslateContext* pCxt, SNode** pTable, bool inJoin); int32_t getMetaDataFromHash(const char* pKey, int32_t len, SHashObj* pHash, void** pOutput); void tfreeSParseQueryRes(void* p); #ifdef TD_ENTERPRISE -int32_t translateView(STranslateContext* pCxt, SNode** pTable, SName* pName); +int32_t translateView(STranslateContext* pCxt, SNode** pTable, SName* pName, bool inJoin); int32_t getViewMetaFromMetaCache(STranslateContext* pCxt, SName* pName, SViewMeta** ppViewMeta); #endif #ifdef __cplusplus diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index 0f8c8ee034..a84de17438 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -207,6 +207,7 @@ static int32_t findAndReplaceNode(SCalcConstContext* pCxt, SNode** pRoot, SNode* static int32_t calcConstProject(SCalcConstContext* pCxt, SNode* pProject, bool dual, SNode** pNew) { SArray* pAssociation = NULL; + if (NULL != ((SExprNode*)pProject)->pAssociation) { pAssociation = taosArrayDup(((SExprNode*)pProject)->pAssociation, NULL); if (NULL == pAssociation) { @@ -227,6 +228,10 @@ static int32_t calcConstProject(SCalcConstContext* pCxt, SNode* pProject, bool d for (int32_t i = 0; i < size; ++i) { SAssociationNode* pAssNode = taosArrayGet(pAssociation, i); SNode** pCol = pAssNode->pPlace; + if (((SExprNode*)pAssNode->pAssociationNode)->joinSrc) { + continue; + } + if (*pCol == pAssNode->pAssociationNode) { tstrncpy(aliasName, ((SExprNode*)*pCol)->aliasName, TSDB_COL_NAME_LEN); SArray* pOrigAss = NULL; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 9d8b255d35..0d8255b2d6 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1383,7 +1383,7 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p pCol->numOfPKs = pTable->pMeta->tableInfo.numOfPKs; } -static int32_t setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef) { +static int32_t setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef, bool joinSrc) { SColumnNode* pCol = *pColRef; if (NULL == pExpr->pAssociation) { @@ -1413,6 +1413,7 @@ static int32_t setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SCo tstrncpy(pCol->node.userAlias, pExpr->userAlias, TSDB_COL_NAME_LEN); } pCol->node.resType = pExpr->resType; + pCol->node.joinSrc = pTable->table.inJoin && joinSrc; return TSDB_CODE_SUCCESS; } @@ -1494,7 +1495,7 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p code = nodesListStrictAppend(pList, (SNode*)pCol); if (TSDB_CODE_SUCCESS == code) { SListCell* pCell = nodesListGetCell(pList, LIST_LENGTH(pList) - 1); - code = setColumnInfoByExpr(pTempTable, (SExprNode*)pNode, (SColumnNode**)&pCell->pNode); + code = setColumnInfoByExpr(pTempTable, (SExprNode*)pNode, (SColumnNode**)&pCell->pNode, true); } if (TSDB_CODE_SUCCESS == code) { if (!skipProjRef) @@ -1591,7 +1592,7 @@ static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef, } } if (pFoundExpr) { - code = setColumnInfoByExpr(pTempTable, pFoundExpr, pColRef); + code = setColumnInfoByExpr(pTempTable, pFoundExpr, pColRef, SQL_CLAUSE_FROM != pCxt->currClause); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -5097,9 +5098,12 @@ static int32_t setJoinTimeLineResMode(STranslateContext* pCxt) { return TSDB_CODE_SUCCESS; } -int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinParent) { +int32_t translateTable(STranslateContext* pCxt, SNode** pTable, bool inJoin) { SSelectStmt* pCurrSmt = (SSelectStmt*)(pCxt->pCurrStmt); int32_t code = TSDB_CODE_SUCCESS; + + ((STableNode*)*pTable)->inJoin = inJoin; + switch (nodeType(*pTable)) { case QUERY_NODE_REAL_TABLE: { SRealTableNode* pRealTable = (SRealTableNode*)*pTable; @@ -5115,7 +5119,7 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinPare } #ifdef TD_ENTERPRISE if (TSDB_VIEW_TABLE == pRealTable->pMeta->tableType && (!pCurrSmt->tagScan || pCxt->pParseCxt->biMode)) { - return translateView(pCxt, pTable, &name); + return translateView(pCxt, pTable, &name, inJoin); } code = translateAudit(pCxt, pRealTable, &name); #endif @@ -5172,10 +5176,10 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinPare SJoinTableNode* pJoinTable = (SJoinTableNode*)*pTable; code = translateJoinTable(pCxt, pJoinTable); if (TSDB_CODE_SUCCESS == code) { - code = translateTable(pCxt, &pJoinTable->pLeft, (SNode*)pJoinTable); + code = translateTable(pCxt, &pJoinTable->pLeft, true); } if (TSDB_CODE_SUCCESS == code) { - code = translateTable(pCxt, &pJoinTable->pRight, (SNode*)pJoinTable); + code = translateTable(pCxt, &pJoinTable->pRight, true); } if (TSDB_CODE_SUCCESS == code) { code = checkJoinTable(pCxt, pJoinTable); @@ -7132,7 +7136,7 @@ static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t translateFrom(STranslateContext* pCxt, SNode** pTable) { pCxt->currClause = SQL_CLAUSE_FROM; - return translateTable(pCxt, pTable, NULL); + return translateTable(pCxt, pTable, false); } static int32_t checkLimit(STranslateContext* pCxt, SSelectStmt* pSelect) {