From 86db8efcda4ff825db17c7a5a29ddcc96098c1b0 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 10 Apr 2024 19:27:36 +0800 Subject: [PATCH] enh: support hash join --- include/common/ttokendef.h | 1 + include/libs/nodes/plannodes.h | 12 +- include/libs/nodes/querynodes.h | 1 + source/libs/command/inc/commandInt.h | 1 + source/libs/command/src/explain.c | 8 +- source/libs/executor/inc/hashjoin.h | 32 ++- source/libs/executor/src/hashjoin.c | 31 +++ source/libs/executor/src/hashjoinoperator.c | 279 +++++++++++--------- source/libs/nodes/src/nodesCloneFuncs.c | 4 + source/libs/nodes/src/nodesCodeFuncs.c | 34 ++- source/libs/nodes/src/nodesMsgFuncs.c | 36 ++- source/libs/nodes/src/nodesUtilFuncs.c | 7 +- source/libs/parser/src/parAstCreater.c | 11 + source/libs/parser/src/parTokenizer.c | 1 + source/libs/planner/inc/planInt.h | 2 + source/libs/planner/src/planLogicCreater.c | 6 +- source/libs/planner/src/planOptimizer.c | 227 +++++++++++++++- source/libs/planner/src/planPhysiCreater.c | 16 +- source/libs/planner/src/planUtil.c | 23 ++ 19 files changed, 593 insertions(+), 139 deletions(-) create mode 100755 source/libs/executor/src/hashjoin.c diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index 440c099922..95e75a4f46 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -399,6 +399,7 @@ #define TK_PARTITION_FIRST 609 #define TK_PARA_TABLES_SORT 610 #define TK_SMALLDATA_TS_SORT 611 +#define TK_HASH_JOIN 612 #define TK_NK_NIL 65535 diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index c3f5382348..d2d58e3da6 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -148,6 +148,12 @@ typedef struct SJoinLogicNode { bool isLowLevelJoin; bool seqWinGroup; bool grpJoin; + bool hashJoinHint; + + // FOR HASH JOIN + STimeWindow timeRange; //table onCond filter + SNode* pLeftOnCond; //table onCond filter + SNode* pRightOnCond; //table onCond filter } SJoinLogicNode; typedef struct SAggLogicNode { @@ -521,10 +527,14 @@ typedef struct SHashJoinPhysiNode { SNode* pJLimit; SNodeList* pOnLeft; SNodeList* pOnRight; - SNode* pFilterConditions; + STimeWindow timeRange; //table onCond filter + SNode* pLeftOnCond; //table onCond filter + SNode* pRightOnCond; //table onCond filter + SNode* pFullOnCond; //preFilter SNodeList* pTargets; SQueryStat inputStat[2]; + // only in planner internal SNode* pPrimKeyCond; SNode* pColEqCond; SNode* pTagEqCond; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index bed3aa4d2a..f9395b5a7b 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -138,6 +138,7 @@ typedef enum EHintOption { HINT_PARTITION_FIRST, HINT_PARA_TABLES_SORT, HINT_SMALLDATA_TS_SORT, + HINT_HASH_JOIN, } EHintOption; typedef struct SHintNode { diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index e580238d22..3178515706 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -121,6 +121,7 @@ extern "C" { #define EXPLAIN_JLIMIT_FORMAT "jlimit=%" PRId64 #define EXPLAIN_SEQ_WIN_GRP_FORMAT "seq_win_grp=%d" #define EXPLAIN_GRP_JOIN_FORMAT "group_join=%d" +#define EXPLAIN_JOIN_ALGO "algo=%s" #define COMMAND_RESET_LOG "resetLog" #define COMMAND_SCHEDULE_POLICY "schedulePolicy" diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 80c56d690d..fbe77b7987 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -639,6 +639,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pJoinNode->node.inputTsOrder)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_JOIN_ALGO, "Merge"); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); @@ -1683,6 +1684,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pJoinNode->node.inputTsOrder)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_JOIN_ALGO, "Hash"); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); @@ -1698,17 +1700,17 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - if (pJoinNode->node.pConditions || pJoinNode->pFilterConditions) { + if (pJoinNode->node.pConditions || pJoinNode->pFullOnCond) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); if (pJoinNode->node.pConditions) { QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); } - if (pJoinNode->pFilterConditions) { + if (pJoinNode->pFullOnCond) { if (pJoinNode->node.pConditions) { EXPLAIN_ROW_APPEND(" AND "); } - QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pFilterConditions, tbuf + VARSTR_HEADER_SIZE, + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pFullOnCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); } EXPLAIN_ROW_END(); diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index 07a78f9008..78fea8b792 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -20,6 +20,10 @@ extern "C" { #endif #define HASH_JOIN_DEFAULT_PAGE_SIZE 10485760 +#define HJOIN_DEFAULT_BLK_ROWS_NUM 4096 +#define HJOIN_BLK_SIZE_LIMIT 10485760 +#define HJOIN_ROW_BITMAP_SIZE (2 * 1048576) +#define HJOIN_BLK_THRESHOLD_RATIO 0.9 #pragma pack(push, 1) typedef struct SBufRowInfo { @@ -31,6 +35,7 @@ typedef struct SBufRowInfo { typedef struct SHJoinCtx { bool rowRemains; + int64_t limit; SBufRowInfo* pBuildRow; SSDataBlock* pProbeData; int32_t probeIdx; @@ -94,17 +99,40 @@ typedef struct SHJoinOperatorInfo { SHJoinTableInfo tbs[2]; SHJoinTableInfo* pBuild; SHJoinTableInfo* pProbe; - SSDataBlock* pRes; + SFilterInfo* pPreFilter; + SFilterInfo* pFinFilter; + SSDataBlock* finBlk; + SSDataBlock* midBlk; int32_t pResColNum; int8_t* pResColMap; SArray* pRowBufs; - SNode* pCond; SSHashObj* pKeyHash; bool keyHashBuilt; SHJoinCtx ctx; SHJoinExecInfo execInfo; + int32_t blkThreshold; } SHJoinOperatorInfo; + +#define HJ_ERR_RET(c) \ + do { \ + int32_t _code = (c); \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + return _code; \ + } \ + } while (0) + +#define HJ_ERR_JRET(c) \ + do { \ + code = (c); \ + if (code != TSDB_CODE_SUCCESS) { \ + terrno = code; \ + goto _return; \ + } \ + } while (0) + + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/hashjoin.c b/source/libs/executor/src/hashjoin.c new file mode 100755 index 0000000000..cfcc8ece50 --- /dev/null +++ b/source/libs/executor/src/hashjoin.c @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executorInt.h" +#include "filter.h" +#include "function.h" +#include "operator.h" +#include "os.h" +#include "querynodes.h" +#include "querytask.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "thash.h" +#include "tmsg.h" +#include "ttypes.h" +#include "hashjoin.h" + + + diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index b23c2175ec..610c4be61a 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -28,7 +28,7 @@ #include "hashjoin.h" -static int64_t getSingleKeyRowsNum(SBufRowInfo* pRow) { +static int64_t hJoinGetSingleKeyRowsNum(SBufRowInfo* pRow) { int64_t rows = 0; while (pRow) { rows++; @@ -37,14 +37,14 @@ static int64_t getSingleKeyRowsNum(SBufRowInfo* pRow) { return rows; } -static int64_t getRowsNumOfKeyHash(SSHashObj* pHash) { +static int64_t hJoinGetRowsNumOfKeyHash(SSHashObj* pHash) { SGroupData* pGroup = NULL; int32_t iter = 0; int64_t rowsNum = 0; while (NULL != (pGroup = tSimpleHashIterate(pHash, pGroup, &iter))) { int32_t* pKey = tSimpleHashGetKey(pGroup, NULL); - int64_t rows = getSingleKeyRowsNum(pGroup->rows); + int64_t rows = hJoinGetSingleKeyRowsNum(pGroup->rows); qTrace("build_key:%d, rows:%" PRId64, *pKey, rows); rowsNum += rows; } @@ -52,7 +52,7 @@ static int64_t getRowsNumOfKeyHash(SSHashObj* pHash) { return rowsNum; } -static int32_t initHJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { +static int32_t hJoinInitKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { pTable->keyNum = LIST_LENGTH(pList); pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SHJoinColInfo)); @@ -82,7 +82,7 @@ static int32_t initHJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { return TSDB_CODE_SUCCESS; } -static void getHJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { +static void hJoinGetValColsNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { *colNum = 0; SNode* pNode = NULL; @@ -95,7 +95,7 @@ static void getHJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) } } -static bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32_t* pKeyIdx) { +static bool hJoinIsValColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32_t* pKeyIdx) { for (int32_t i = 0; i < keyNum; ++i) { if (pKeys[i].srcSlot == slotId) { *pKeyIdx = i; @@ -106,8 +106,8 @@ static bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys return false; } -static int32_t initHJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { - getHJoinValColNum(pList, pTable->blkId, &pTable->valNum); +static int32_t hJoinInitValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { + hJoinGetValColsNum(pList, pTable->blkId, &pTable->valNum); if (pTable->valNum == 0) { return TSDB_CODE_SUCCESS; } @@ -124,7 +124,7 @@ static int32_t initHJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { STargetNode* pTarget = (STargetNode*)pNode; SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr; if (pColNode->dataBlockId == pTable->blkId) { - if (valColInKeyCols(pColNode->slotId, pTable->keyNum, pTable->keyCols, &pTable->valCols[i].srcSlot)) { + if (hJoinIsValColInKeyCols(pColNode->slotId, pTable->keyNum, pTable->keyCols, &pTable->valCols[i].srcSlot)) { pTable->valCols[i].keyCol = true; } else { pTable->valCols[i].keyCol = false; @@ -134,7 +134,7 @@ static int32_t initHJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { } pTable->valCols[i].dstSlot = pTarget->slotId; pTable->valCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); - if (pTable->valCols[i].vardata) { + if (pTable->valCols[i].vardata && !pTable->valCols[i].keyCol) { if (NULL == pTable->valVarCols) { pTable->valVarCols = taosArrayInit(pTable->valNum, sizeof(int32_t)); if (NULL == pTable->valVarCols) { @@ -158,7 +158,7 @@ static int32_t initHJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { } -static int32_t initHJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { +static int32_t hJoinInitTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { SNodeList* pKeyList = NULL; SHJoinTableInfo* pTable = &pJoin->tbs[idx]; pTable->downStream = pDownstream[idx]; @@ -169,11 +169,11 @@ static int32_t initHJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pKeyList = pJoinNode->pOnRight; } - int32_t code = initHJoinKeyColsInfo(pTable, pKeyList); + int32_t code = hJoinInitKeyColsInfo(pTable, pKeyList); if (code) { return code; } - code = initHJoinValColsInfo(pTable, pJoinNode->pTargets); + code = hJoinInitValColsInfo(pTable, pJoinNode->pTargets); if (code) { return code; } @@ -183,7 +183,7 @@ static int32_t initHJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* return TSDB_CODE_SUCCESS; } -static void setHJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { +static void hJoinSetBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { int32_t buildIdx = 0; int32_t probeIdx = 1; @@ -218,7 +218,7 @@ static void setHJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysi pInfo->pProbe->downStreamIdx = probeIdx; } -static int32_t buildHJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { +static int32_t hJoinBuildResColsMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { pInfo->pResColNum = pJoinNode->pTargets->length; pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t)); if (NULL == pInfo->pResColMap) { @@ -241,7 +241,7 @@ static int32_t buildHJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode } -static FORCE_INLINE int32_t addPageToHJoinBuf(SArray* pRowBufs) { +static FORCE_INLINE int32_t hJoinAddPageToBufs(SArray* pRowBufs) { SBufPageInfo page; page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE; page.offset = 0; @@ -254,28 +254,28 @@ static FORCE_INLINE int32_t addPageToHJoinBuf(SArray* pRowBufs) { return TSDB_CODE_SUCCESS; } -static int32_t initHJoinBufPages(SHJoinOperatorInfo* pInfo) { +static int32_t hJoinInitBufPages(SHJoinOperatorInfo* pInfo) { pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo)); if (NULL == pInfo->pRowBufs) { return TSDB_CODE_OUT_OF_MEMORY; } - return addPageToHJoinBuf(pInfo->pRowBufs); + return hJoinAddPageToBufs(pInfo->pRowBufs); } -static void freeHJoinTableInfo(SHJoinTableInfo* pTable) { +static void hJoinFreeTableInfo(SHJoinTableInfo* pTable) { taosMemoryFreeClear(pTable->keyCols); taosMemoryFreeClear(pTable->keyBuf); taosMemoryFreeClear(pTable->valCols); taosArrayDestroy(pTable->valVarCols); } -static void freeHJoinBufPage(void* param) { +static void hJoinFreeBufPage(void* param) { SBufPageInfo* pInfo = (SBufPageInfo*)param; taosMemoryFree(pInfo->data); } -static void destroyHJoinKeyHash(SSHashObj** ppHash) { +static void hJoinDestroyKeyHash(SSHashObj** ppHash) { if (NULL == ppHash || NULL == (*ppHash)) { return; } @@ -297,30 +297,15 @@ static void destroyHJoinKeyHash(SSHashObj** ppHash) { *ppHash = NULL; } -static void destroyHashJoinOperator(void* param) { - SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param; - qDebug("hashJoin exec info, buildBlk:%" PRId64 ", buildRows:%" PRId64 ", probeBlk:%" PRId64 ", probeRows:%" PRId64 ", resRows:%" PRId64, - pJoinOperator->execInfo.buildBlkNum, pJoinOperator->execInfo.buildBlkRows, pJoinOperator->execInfo.probeBlkNum, - pJoinOperator->execInfo.probeBlkRows, pJoinOperator->execInfo.resRows); - - destroyHJoinKeyHash(&pJoinOperator->pKeyHash); - - freeHJoinTableInfo(&pJoinOperator->tbs[0]); - freeHJoinTableInfo(&pJoinOperator->tbs[1]); - pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); - taosMemoryFreeClear(pJoinOperator->pResColMap); - taosArrayDestroyEx(pJoinOperator->pRowBufs, freeHJoinBufPage); - nodesDestroyNode(pJoinOperator->pCond); - - taosMemoryFreeClear(param); -} - -static FORCE_INLINE char* retrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) { +static FORCE_INLINE char* hJoinRetrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) { + if ((uint16_t)-1 == pRow->pageId) { + return NULL; + } SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId); return pPage->data + pRow->offset; } -static FORCE_INLINE int32_t copyHJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { +static FORCE_INLINE int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { SHJoinTableInfo* pBuild = pJoin->pBuild; SHJoinTableInfo* pProbe = pJoin->pProbe; int32_t buildIdx = 0, buildValIdx = 0; @@ -329,7 +314,7 @@ static FORCE_INLINE int32_t copyHJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, i int32_t code = 0; for (int32_t r = 0; r < rowNum; ++r) { - char* pData = retrieveColDataFromRowBufs(pJoin->pRowBufs, pRow); + char* pData = hJoinRetrieveColDataFromRowBufs(pJoin->pRowBufs, pRow); char* pValData = pData + pBuild->valBitMapSize; char* pKeyData = pProbe->keyData; buildIdx = buildValIdx = probeIdx = 0; @@ -376,7 +361,7 @@ static FORCE_INLINE int32_t copyHJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, i } -static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched) { +static FORCE_INLINE void hJoinAppendResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched) { SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinCtx* pCtx = &pJoin->ctx; SBufRowInfo* pStart = pCtx->pBuildRow; @@ -391,7 +376,7 @@ static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, pJoin->execInfo.resRows += rowNum; - int32_t code = copyHJoinResRowsToBlock(pJoin, rowNum, pStart, pRes); + int32_t code = hJoinCopyResRowsToBlock(pJoin, rowNum, pStart, pRes); if (code) { pOperator->pTaskInfo->code = code; T_LONG_JMP(pOperator->pTaskInfo->env, code); @@ -402,7 +387,7 @@ static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, } -static FORCE_INLINE bool copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { +static FORCE_INLINE bool hJoinCopyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { char *pData = NULL; size_t bufLen = 0; @@ -448,12 +433,12 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) { SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinTableInfo* pProbe = pJoin->pProbe; SHJoinCtx* pCtx = &pJoin->ctx; - SSDataBlock* pRes = pJoin->pRes; + SSDataBlock* pRes = pJoin->finBlk; size_t bufLen = 0; bool allFetched = false; if (pJoin->ctx.pBuildRow) { - appendHJoinResToBlock(pOperator, pRes, &allFetched); + hJoinAppendResToBlock(pOperator, pRes, &allFetched); if (pRes->info.rows >= pRes->info.capacity) { if (allFetched) { ++pCtx->probeIdx; @@ -466,7 +451,7 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) { } for (; pCtx->probeIdx < pCtx->pProbeData->info.rows; ++pCtx->probeIdx) { - if (copyKeyColsDataToBuf(pProbe, pCtx->probeIdx, &bufLen)) { + if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeIdx, &bufLen)) { continue; } @@ -481,7 +466,7 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) { */ if (pGroup) { pCtx->pBuildRow = pGroup->rows; - appendHJoinResToBlock(pOperator, pRes, &allFetched); + hJoinAppendResToBlock(pOperator, pRes, &allFetched); if (pRes->info.rows >= pRes->info.capacity) { if (allFetched) { ++pCtx->probeIdx; @@ -497,7 +482,7 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) { pCtx->rowRemains = false; } -static int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { +static int32_t hJoinSetKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { for (int32_t i = 0; i < pTable->keyNum; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { @@ -518,7 +503,7 @@ static int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { return TSDB_CODE_SUCCESS; } -static int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { +static int32_t hJoinSetValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { if (!pTable->valColExist) { return TSDB_CODE_SUCCESS; } @@ -549,7 +534,7 @@ static int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { -static FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) { +static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) { if (!pTable->valColExist) { return; } @@ -583,7 +568,7 @@ static FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t r } -static FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) { +static FORCE_INLINE int32_t hJoinGetValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) { if (0 == bufSize) { pRow->pageId = -1; return TSDB_CODE_SUCCESS; @@ -604,14 +589,14 @@ static FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, return TSDB_CODE_SUCCESS; } - int32_t code = addPageToHJoinBuf(pPages); + int32_t code = hJoinAddPageToBufs(pPages); if (code) { return code; } } while (true); } -static FORCE_INLINE int32_t getHJoinValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) { +static FORCE_INLINE int32_t hJoinGetValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) { if (NULL == pTable->valVarCols) { return pTable->valBufSize; } @@ -629,7 +614,7 @@ static FORCE_INLINE int32_t getHJoinValBufSize(SHJoinTableInfo* pTable, int32_t } -static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableInfo* pTable, size_t keyLen, int32_t rowIdx) { +static int32_t hJoinAddRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableInfo* pTable, size_t keyLen, int32_t rowIdx) { SGroupData group = {0}; SBufRowInfo* pRow = NULL; @@ -646,7 +631,7 @@ static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, S } } - int32_t code = getValBufFromPages(pJoin->pRowBufs, getHJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow); + int32_t code = hJoinGetValBufFromPages(pJoin->pRowBufs, hJoinGetValBufSize(pTable, rowIdx), &pTable->valData, pRow); if (code) { taosMemoryFree(pRow); return code; @@ -666,37 +651,37 @@ static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, S return TSDB_CODE_SUCCESS; } -static int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) { +static int32_t hJoinAddRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) { SHJoinTableInfo* pBuild = pJoin->pBuild; - int32_t code = setValColsData(pBlock, pBuild); + int32_t code = hJoinSetValColsData(pBlock, pBuild); if (code) { return code; } SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyData, keyLen); - code = addRowToHashImpl(pJoin, pGroup, pBuild, keyLen, rowIdx); + code = hJoinAddRowToHashImpl(pJoin, pGroup, pBuild, keyLen, rowIdx); if (code) { return code; } - copyValColsDataToBuf(pBuild, rowIdx); + hJoinCopyValColsDataToBuf(pBuild, rowIdx); return TSDB_CODE_SUCCESS; } -static int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { +static int32_t hJoinAddBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { SHJoinTableInfo* pBuild = pJoin->pBuild; - int32_t code = setKeyColsData(pBlock, pBuild); + int32_t code = hJoinSetKeyColsData(pBlock, pBuild); if (code) { return code; } size_t bufLen = 0; for (int32_t i = 0; i < pBlock->info.rows; ++i) { - if (copyKeyColsDataToBuf(pBuild, i, &bufLen)) { + if (hJoinCopyKeyColsDataToBuf(pBuild, i, &bufLen)) { continue; } - code = addRowToHash(pJoin, pBlock, bufLen, i); + code = hJoinAddRowToHash(pJoin, pBlock, bufLen, i); if (code) { return code; } @@ -705,7 +690,7 @@ static int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin return code; } -static int32_t buildHJoinKeyHash(struct SOperatorInfo* pOperator) { +static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator) { SHJoinOperatorInfo* pJoin = pOperator->info; SSDataBlock* pBlock = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -719,7 +704,7 @@ static int32_t buildHJoinKeyHash(struct SOperatorInfo* pOperator) { pJoin->execInfo.buildBlkNum++; pJoin->execInfo.buildBlkRows += pBlock->info.rows; - code = addBlockRowsToHash(pBlock, pJoin); + code = hJoinAddBlockRowsToHash(pBlock, pJoin); if (code) { return code; } @@ -728,14 +713,14 @@ static int32_t buildHJoinKeyHash(struct SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -static int32_t launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { +static int32_t hJoinPrepareStart(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinTableInfo* pProbe = pJoin->pProbe; - int32_t code = setKeyColsData(pBlock, pProbe); + int32_t code = hJoinSetKeyColsData(pBlock, pProbe); if (code) { return code; } - code = setValColsData(pBlock, pProbe); + code = hJoinSetValColsData(pBlock, pProbe); if (code) { return code; } @@ -750,20 +735,20 @@ static int32_t launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* return TSDB_CODE_SUCCESS; } -static void setHJoinDone(struct SOperatorInfo* pOperator) { +static void hJoinSetDone(struct SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); SHJoinOperatorInfo* pInfo = pOperator->info; - destroyHJoinKeyHash(&pInfo->pKeyHash); + hJoinDestroyKeyHash(&pInfo->pKeyHash); qDebug("hash Join done"); } -static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { +static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { SHJoinOperatorInfo* pJoin = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = TSDB_CODE_SUCCESS; - SSDataBlock* pRes = pJoin->pRes; + SSDataBlock* pRes = pJoin->finBlk; pRes->info.rows = 0; int64_t st = 0; @@ -778,14 +763,14 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { if (!pJoin->keyHashBuilt) { pJoin->keyHashBuilt = true; - code = buildHJoinKeyHash(pOperator); + code = hJoinBuildHash(pOperator); if (code) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } if (tSimpleHashGetSize(pJoin->pKeyHash) <= 0) { - setHJoinDone(pOperator); + hJoinSetDone(pOperator); goto _return; } @@ -795,8 +780,8 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { if (pJoin->ctx.rowRemains) { doHashJoinImpl(pOperator); - if (pRes->info.rows >= pRes->info.capacity && pOperator->exprSupp.pFilterInfo != NULL) { - doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + if (pRes->info.rows >= pRes->info.capacity && pJoin->pFinFilter != NULL) { + doFilter(pRes, pJoin->pFinFilter, NULL); } if (pRes->info.rows > 0) { return pRes; @@ -806,25 +791,25 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { while (true) { SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, pJoin->pProbe->downStreamIdx); if (NULL == pBlock) { - setHJoinDone(pOperator); + hJoinSetDone(pOperator); break; } pJoin->execInfo.probeBlkNum++; pJoin->execInfo.probeBlkRows += pBlock->info.rows; - code = launchBlockHashJoin(pOperator, pBlock); + code = hJoinPrepareStart(pOperator, pBlock); if (code) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } - if (pRes->info.rows < pOperator->resultInfo.threshold) { + if (pRes->info.rows < pJoin->blkThreshold) { continue; } - if (pOperator->exprSupp.pFilterInfo != NULL) { - doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + if (pJoin->pFinFilter != NULL) { + doFilter(pRes, pJoin->pFinFilter, NULL); } if (pRes->info.rows > 0) { break; @@ -840,6 +825,85 @@ _return: return (pRes->info.rows > 0) ? pRes : NULL; } + +static void destroyHashJoinOperator(void* param) { + SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param; + qDebug("hashJoin exec info, buildBlk:%" PRId64 ", buildRows:%" PRId64 ", probeBlk:%" PRId64 ", probeRows:%" PRId64 ", resRows:%" PRId64, + pJoinOperator->execInfo.buildBlkNum, pJoinOperator->execInfo.buildBlkRows, pJoinOperator->execInfo.probeBlkNum, + pJoinOperator->execInfo.probeBlkRows, pJoinOperator->execInfo.resRows); + + hJoinDestroyKeyHash(&pJoinOperator->pKeyHash); + + hJoinFreeTableInfo(&pJoinOperator->tbs[0]); + hJoinFreeTableInfo(&pJoinOperator->tbs[1]); + pJoinOperator->finBlk = blockDataDestroy(pJoinOperator->finBlk); + taosMemoryFreeClear(pJoinOperator->pResColMap); + taosArrayDestroyEx(pJoinOperator->pRowBufs, hJoinFreeBufPage); + + taosMemoryFreeClear(param); +} + +int32_t hJoinHandleConds(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode) { + switch (pJoin->joinType) { + case JOIN_TYPE_INNER: { + SNode* pCond = NULL; + if (pJoinNode->pFullOnCond != NULL) { + if (pJoinNode->node.pConditions != NULL) { + HJ_ERR_RET(mergeJoinConds(&pJoinNode->pFullOnCond, &pJoinNode->node.pConditions)); + } + pCond = pJoinNode->pFullOnCond; + } else if (pJoinNode->node.pConditions != NULL) { + pCond = pJoinNode->node.pConditions; + } + + HJ_ERR_RET(filterInitFromNode(pCond, &pJoin->pFinFilter, 0)); + break; + } + case JOIN_TYPE_LEFT: + case JOIN_TYPE_RIGHT: + case JOIN_TYPE_FULL: + if (pJoinNode->pFullOnCond != NULL) { + HJ_ERR_RET(filterInitFromNode(pJoinNode->pFullOnCond, &pJoin->pPreFilter, 0)); + } + if (pJoinNode->node.pConditions != NULL) { + HJ_ERR_RET(filterInitFromNode(pJoinNode->node.pConditions, &pJoin->pFinFilter, 0)); + } + break; + default: + break; + } + + return TSDB_CODE_SUCCESS; +} + +static uint32_t hJoinGetFinBlkCapacity(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode) { + uint32_t maxRows = TMAX(HJOIN_DEFAULT_BLK_ROWS_NUM, HJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize); + if (INT64_MAX != pJoin->ctx.limit && NULL == pJoin->pFinFilter) { + uint32_t limitMaxRows = pJoin->ctx.limit / HJOIN_BLK_THRESHOLD_RATIO + 1; + return (maxRows > limitMaxRows) ? limitMaxRows : maxRows; + } + + return maxRows; +} + + +int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode) { + pJoin->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0); + + blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode)); + + if (NULL != pJoin->pPreFilter) { + pJoin->midBlk = createOneDataBlock(pJoin->finBlk, false); + blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity); + } + + pJoin->blkThreshold = pJoin->finBlk->info.capacity * HJOIN_BLK_THRESHOLD_RATIO; + + return TSDB_CODE_SUCCESS; +} + + SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo)); @@ -851,23 +915,22 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n goto _error; } - int32_t numOfCols = 0; - pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - initResultSizeInfo(&pOperator->resultInfo, 4096); - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + pInfo->ctx.limit = pJoinNode->node.pLimit ? ((SLimitNode*)pJoinNode->node.pLimit)->limit : INT64_MAX; + + hJoinInitResBlocks(pInfo, pJoinNode); setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - initHJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); - initHJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); + hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); + hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); - setHJoinBuildAndProbeTable(pInfo, pJoinNode); - code = buildHJoinResColMap(pInfo, pJoinNode); + hJoinSetBuildAndProbeTable(pInfo, pJoinNode); + code = hJoinBuildResColsMap(pInfo, pJoinNode); if (code) { goto _error; } - code = initHJoinBufPages(pInfo); + code = hJoinInitBufPages(pInfo); if (code) { goto _error; } @@ -879,42 +942,14 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n goto _error; } - if (pJoinNode->pFilterConditions != NULL && pJoinNode->node.pConditions != NULL) { - pInfo->pCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); - if (pInfo->pCond == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCond); - pLogicCond->pParameterList = nodesMakeList(); - if (pLogicCond->pParameterList == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pFilterConditions)); - nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); - pLogicCond->condType = LOGIC_COND_TYPE_AND; - } else if (pJoinNode->pFilterConditions != NULL) { - pInfo->pCond = nodesCloneNode(pJoinNode->pFilterConditions); - } else if (pJoinNode->node.pConditions != NULL) { - pInfo->pCond = nodesCloneNode(pJoinNode->node.pConditions); - } else { - pInfo->pCond = NULL; - } - - code = filterInitFromNode(pInfo->pCond, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + HJ_ERR_JRET(hJoinHandleConds(pInfo, pJoinNode)); code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hJoinMainProcess, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); qDebug("create hash Join operator done"); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 1b36801d6b..0fe48ad282 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -493,6 +493,10 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_SCALAR_FIELD(isLowLevelJoin); COPY_SCALAR_FIELD(seqWinGroup); COPY_SCALAR_FIELD(grpJoin); + COPY_SCALAR_FIELD(hashJoinHint); + CLONE_NODE_FIELD(pLeftOnCond); + CLONE_NODE_FIELD(pRightOnCond); + COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 74f952938b..f7521a7b13 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2185,6 +2185,10 @@ static const char* jkJoinPhysiPlanLeftInputRowSize = "LeftInputRowSize"; static const char* jkJoinPhysiPlanRightInputRowSize = "RightInputRowSize"; static const char* jkJoinPhysiPlanSeqWinGroup = "SeqWinGroup"; static const char* jkJoinPhysiPlanGroupJoin = "GroupJoin"; +static const char* jkJoinPhysiPlanLeftOnCond = "LeftOnCond"; +static const char* jkJoinPhysiPlanRightOnCond = "RightOnCond"; +static const char* jkJoinPhysiPlanTimeRangeSKey = "TimeRangeSKey"; +static const char* jkJoinPhysiPlanTimeRangeEKey = "TimeRangeEKey"; static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; @@ -2336,7 +2340,7 @@ static int32_t physiHashJoinNodeToJson(const void* pObj, SJson* pJson) { code = nodeListToJson(pJson, jkJoinPhysiPlanOnRightCols, pNode->pOnRight); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pFilterConditions); + code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pFullOnCond); } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); @@ -2353,6 +2357,19 @@ static int32_t physiHashJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanLeftOnCond, nodeToJson, pNode->pLeftOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanRightOnCond, nodeToJson, pNode->pRightOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanTimeRangeSKey, pNode->timeRange.skey); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanTimeRangeEKey, pNode->timeRange.ekey); + } + return code; } @@ -2371,7 +2388,7 @@ static int32_t jsonToPhysiHashJoinNode(const SJson* pJson, void* pObj) { code = jsonToNodeList(pJson, jkJoinPhysiPlanOnRightCols, &pNode->pOnRight); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pFilterConditions); + code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pFullOnCond); } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); @@ -2388,6 +2405,19 @@ static int32_t jsonToPhysiHashJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize, code); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanLeftOnCond, &pNode->pLeftOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanRightOnCond, &pNode->pRightOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkJoinPhysiPlanTimeRangeSKey, &pNode->timeRange.skey); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkJoinPhysiPlanTimeRangeEKey, &pNode->timeRange.ekey); + } + return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index e75f5414f0..5934aff63b 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2648,7 +2648,12 @@ enum { PHY_HASH_JOIN_CODE_INPUT_ROW_NUM0, PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE0, PHY_HASH_JOIN_CODE_INPUT_ROW_NUM1, - PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE1 + PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE1, + PHY_HASH_JOIN_CODE_LEFT_ON_COND, + PHY_HASH_JOIN_CODE_RIGHT_ON_COND, + PHY_HASH_JOIN_CODE_TIME_RANGE_SKEY, + PHY_HASH_JOIN_CODE_TIME_RANGE_EKEY, + }; static int32_t physiHashJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2665,7 +2670,7 @@ static int32_t physiHashJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN, nodeListToMsg, pNode->pOnRight); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pFilterConditions); + code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pFullOnCond); } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); @@ -2682,6 +2687,19 @@ static int32_t physiHashJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeI32(pEncoder, PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE1, pNode->inputStat[1].inputRowSize); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_LEFT_ON_COND, nodeToMsg, pNode->pLeftOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_RIGHT_ON_COND, nodeToMsg, pNode->pRightOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_HASH_JOIN_CODE_TIME_RANGE_SKEY, pNode->timeRange.skey); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_HASH_JOIN_CODE_TIME_RANGE_EKEY, pNode->timeRange.ekey); + } + return code; } @@ -2706,7 +2724,7 @@ static int32_t msgToPhysiHashJoinNode(STlvDecoder* pDecoder, void* pObj) { code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pOnRight); break; case PHY_HASH_JOIN_CODE_ON_CONDITIONS: - code = msgToNodeFromTlv(pTlv, (void**)&pNode->pFilterConditions); + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pFullOnCond); break; case PHY_HASH_JOIN_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); @@ -2723,6 +2741,18 @@ static int32_t msgToPhysiHashJoinNode(STlvDecoder* pDecoder, void* pObj) { case PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE1: code = tlvDecodeI32(pTlv, &pNode->inputStat[1].inputRowSize); break; + case PHY_HASH_JOIN_CODE_LEFT_ON_COND: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pLeftOnCond); + break; + case PHY_HASH_JOIN_CODE_RIGHT_ON_COND: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pRightOnCond); + break; + case PHY_HASH_JOIN_CODE_TIME_RANGE_SKEY: + code = tlvDecodeI64(pTlv, &pNode->timeRange.skey); + break; + case PHY_HASH_JOIN_CODE_TIME_RANGE_EKEY: + code = tlvDecodeI64(pTlv, &pNode->timeRange.ekey); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 8d4990b228..896425c867 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1296,6 +1296,8 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pLogicNode->pFullOnCond); nodesDestroyList(pLogicNode->pLeftEqNodes); nodesDestroyList(pLogicNode->pRightEqNodes); + nodesDestroyNode(pLogicNode->pLeftOnCond); + nodesDestroyNode(pLogicNode->pRightOnCond); break; } case QUERY_NODE_LOGIC_PLAN_AGG: { @@ -1457,12 +1459,15 @@ void nodesDestroyNode(SNode* pNode) { destroyPhysiNode((SPhysiNode*)pPhyNode); nodesDestroyList(pPhyNode->pOnLeft); nodesDestroyList(pPhyNode->pOnRight); - nodesDestroyNode(pPhyNode->pFilterConditions); + nodesDestroyNode(pPhyNode->pFullOnCond); nodesDestroyList(pPhyNode->pTargets); nodesDestroyNode(pPhyNode->pPrimKeyCond); nodesDestroyNode(pPhyNode->pColEqCond); nodesDestroyNode(pPhyNode->pTagEqCond); + + nodesDestroyNode(pPhyNode->pLeftOnCond); + nodesDestroyNode(pPhyNode->pRightOnCond); break; } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 87c5d563d7..c55208feda 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -480,6 +480,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt case HINT_SMALLDATA_TS_SORT: if (paramNum > 0 || hasHint(*ppHintList, HINT_SMALLDATA_TS_SORT)) return true; break; + case HINT_HASH_JOIN: + if (paramNum > 0 || hasHint(*ppHintList, HINT_HASH_JOIN)) return true; + break; default: return true; } @@ -574,6 +577,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) { } opt = HINT_SMALLDATA_TS_SORT; break; + case TK_HASH_JOIN: + lastComma = false; + if (0 != opt || inParamList) { + quit = true; + break; + } + opt = HINT_HASH_JOIN; + break; case TK_NK_LP: lastComma = false; if (0 == opt || inParamList) { diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 4c9c9d0ea5..f9f40e9c76 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -118,6 +118,7 @@ static SKeyword keywordTable[] = { {"LOGS", TK_LOGS}, {"MACHINES", TK_MACHINES}, {"GROUP", TK_GROUP}, + {"HASH_JOIN", TK_HASH_JOIN}, {"HAVING", TK_HAVING}, {"HOST", TK_HOST}, {"IF", TK_IF}, diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index fd3ae94717..24b31f9f37 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -49,6 +49,7 @@ bool getBatchScanOptionFromHint(SNodeList* pList); bool getSortForGroupOptHint(SNodeList* pList); bool getParaTablesSortOptHint(SNodeList* pList); bool getSmallDataTsSortOptHint(SNodeList* pList); +bool getHashJoinOptHint(SNodeList* pList); bool getOptHint(SNodeList* pList, EHintOption hint); SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr); int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes); @@ -58,6 +59,7 @@ bool isPartTableWinodw(SWindowLogicNode* pWindow); bool keysHasCol(SNodeList* pKeys); bool keysHasTbname(SNodeList* pKeys); SFunctionNode* createGroupKeyAggFunc(SColumnNode* pGroupCol); +int32_t getTimeRangeFromNode(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* pIsStrict); #define CLONE_LIMIT 1 #define CLONE_SLIMIT 1 << 1 diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index cdcb3578b3..b4a4879378 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -548,14 +548,16 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->hasSubQuery = pJoinTable->hasSubQuery; pJoin->node.inputTsOrder = ORDER_ASC; pJoin->node.groupAction = GROUP_ACTION_CLEAR; - pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; + pJoin->hashJoinHint = getHashJoinOptHint(pSelect->pHint); + pJoin->node.requireDataOrder = pJoin->hashJoinHint ? DATA_ORDER_LEVEL_NONE : DATA_ORDER_LEVEL_GLOBAL; pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin; pJoin->pWindowOffset = nodesCloneNode(pJoinTable->pWindowOffset); pJoin->pJLimit = nodesCloneNode(pJoinTable->pJLimit); pJoin->addPrimEqCond = nodesCloneNode(pJoinTable->addPrimCond); pJoin->node.pChildren = nodesMakeList(); - pJoin->seqWinGroup = (JOIN_STYPE_WIN == pJoinTable->subType) && (pSelect->hasAggFuncs || pSelect->hasIndefiniteRowsFunc); + pJoin->seqWinGroup = (JOIN_STYPE_WIN == pJoinTable->subType) && (pSelect->hasAggFuncs || pSelect->hasIndefiniteRowsFunc); + if (NULL == pJoin->node.pChildren) { code = TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 7892e66a44..56091fe5d9 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -4729,6 +4729,230 @@ static int32_t sortNonPriKeyOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog return TSDB_CODE_SUCCESS; } +static bool hashJoinOptShouldBeOptimized(SLogicNode* pNode) { + bool res = false; + if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) { + return res; + } + + SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; + if (pJoin->joinAlgo != JOIN_ALGO_UNKNOWN) { + return res; + } + + if (!pJoin->hashJoinHint) { + goto _return; + } + + if ((JOIN_STYPE_NONE != pJoin->subType && JOIN_STYPE_OUTER != pJoin->subType) || NULL != pJoin->pTagOnCond || NULL != pJoin->pColOnCond || pNode->pChildren->length != 2 ) { + goto _return; + } + + res = true; + +_return: + + if (!res && DATA_ORDER_LEVEL_NONE == pJoin->node.requireDataOrder) { + pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; + adjustLogicNodeDataRequirement(pNode, pJoin->node.requireDataOrder); + } + + return res; +} + +static int32_t hashJoinOptSplitPrimFromLogicCond(SNode **pCondition, SNode **pPrimaryKeyCond) { + SLogicConditionNode *pLogicCond = (SLogicConditionNode *)(*pCondition); + int32_t code = TSDB_CODE_SUCCESS; + SNodeList *pPrimaryKeyConds = NULL; + SNode *pCond = NULL; + WHERE_EACH(pCond, pLogicCond->pParameterList) { + if (isCondColumnsFromMultiTable(pCond) || COND_TYPE_PRIMARY_KEY != classifyCondition(pCond)) { + WHERE_NEXT; + } + + code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond)); + if (TSDB_CODE_SUCCESS != code) { + break; + } + + ERASE_NODE(pLogicCond->pParameterList); + } + + SNode *pTempPrimaryKeyCond = NULL; + if (TSDB_CODE_SUCCESS == code && pPrimaryKeyConds) { + code = nodesMergeConds(&pTempPrimaryKeyCond, &pPrimaryKeyConds); + } + + if (TSDB_CODE_SUCCESS == code && pTempPrimaryKeyCond) { + *pPrimaryKeyCond = pTempPrimaryKeyCond; + + if (pLogicCond->pParameterList->length <= 0) { + nodesDestroyNode(*pCondition); + *pCondition = NULL; + } + } else { + nodesDestroyList(pPrimaryKeyConds); + nodesDestroyNode(pTempPrimaryKeyCond); + } + + return code; +} + + +int32_t hashJoinOptSplitPrimCond(SNode **pCondition, SNode **pPrimaryKeyCond) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCondition)) { + if (LOGIC_COND_TYPE_AND == ((SLogicConditionNode *)*pCondition)->condType) { + return hashJoinOptSplitPrimFromLogicCond(pCondition, pPrimaryKeyCond); + } + + return TSDB_CODE_SUCCESS; + } + + bool needOutput = false; + if (isCondColumnsFromMultiTable(*pCondition)) { + return TSDB_CODE_SUCCESS; + } + + EConditionType type = classifyCondition(*pCondition); + if (COND_TYPE_PRIMARY_KEY == type) { + *pPrimaryKeyCond = *pCondition; + *pCondition = NULL; + } + + return TSDB_CODE_SUCCESS; +} + + +static int32_t hashJoinOptRewriteJoin(SOptimizeContext* pCxt, SLogicNode* pNode, SLogicSubplan* pLogicSubplan) { + SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; + int32_t code = TSDB_CODE_SUCCESS; + + pJoin->joinAlgo = JOIN_ALGO_HASH; + + if (NULL != pJoin->pColOnCond) { + EJoinType t = pJoin->joinType; + EJoinSubType s = pJoin->subType; + + pJoin->joinType = JOIN_TYPE_INNER; + pJoin->subType = JOIN_STYPE_NONE; + + code = pdcJoinSplitCond(pJoin, &pJoin->pColOnCond, NULL, &pJoin->pLeftOnCond, &pJoin->pRightOnCond, true); + + pJoin->joinType = t; + pJoin->subType = s; + + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + STimeWindow ltimeRange = TSWINDOW_INITIALIZER; + STimeWindow rtimeRange = TSWINDOW_INITIALIZER; + SNode* pPrimaryKeyCond = NULL; + if (NULL != pJoin->pLeftOnCond) { + hashJoinOptSplitPrimCond(&pJoin->pLeftOnCond, &pPrimaryKeyCond); + if (NULL != pPrimaryKeyCond) { + bool isStrict = false; + code = getTimeRangeFromNode(&pPrimaryKeyCond, <imeRange, &isStrict); + nodesDestroyNode(pPrimaryKeyCond); + } + } + + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + if (NULL != pJoin->pRightOnCond) { + pPrimaryKeyCond = NULL; + hashJoinOptSplitPrimCond(&pJoin->pRightOnCond, &pPrimaryKeyCond); + if (NULL != pPrimaryKeyCond) { + bool isStrict = false; + code = getTimeRangeFromNode(&pPrimaryKeyCond, &rtimeRange, &isStrict); + nodesDestroyNode(pPrimaryKeyCond); + } + } + + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + if (TSWINDOW_IS_EQUAL(ltimeRange, TSWINDOW_INITIALIZER)) { + pJoin->timeRange.skey = rtimeRange.skey; + pJoin->timeRange.ekey = rtimeRange.ekey; + } else if (TSWINDOW_IS_EQUAL(rtimeRange, TSWINDOW_INITIALIZER)) { + pJoin->timeRange.skey = ltimeRange.skey; + pJoin->timeRange.ekey = ltimeRange.ekey; + } else if (ltimeRange.ekey < rtimeRange.skey || ltimeRange.skey > rtimeRange.ekey) { + pJoin->timeRange = TSWINDOW_DESC_INITIALIZER; + } else { + pJoin->timeRange.skey = TMAX(ltimeRange.skey, rtimeRange.skey); + pJoin->timeRange.ekey = TMIN(ltimeRange.ekey, rtimeRange.ekey); + } + } + + if (NULL != pJoin->pTagOnCond && !TSWINDOW_IS_EQUAL(pJoin->timeRange, TSWINDOW_DESC_INITIALIZER)) { + EJoinType t = pJoin->joinType; + EJoinSubType s = pJoin->subType; + SNode* pLeftChildCond = NULL; + SNode* pRightChildCond = NULL; + + pJoin->joinType = JOIN_TYPE_INNER; + pJoin->subType = JOIN_STYPE_NONE; + + code = pdcJoinSplitCond(pJoin, &pJoin->pTagOnCond, NULL, &pLeftChildCond, &pRightChildCond, true); + + pJoin->joinType = t; + pJoin->subType = s; + + if (TSDB_CODE_SUCCESS == code) { + code = mergeJoinConds(&pJoin->pLeftOnCond, &pLeftChildCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = mergeJoinConds(&pJoin->pRightOnCond, &pRightChildCond); + } + + nodesDestroyNode(pLeftChildCond); + nodesDestroyNode(pRightChildCond); + + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + + if (!TSWINDOW_IS_EQUAL(pJoin->timeRange, TSWINDOW_DESC_INITIALIZER)) { + FOREACH(pNode, pJoin->node.pChildren) { + if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) { + continue; + } + + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + if (TSWINDOW_IS_EQUAL(pScan->scanRange, TSWINDOW_INITIALIZER)) { + continue; + } else if (pJoin->timeRange.ekey < pScan->scanRange.skey || pJoin->timeRange.skey > pScan->scanRange.ekey) { + pJoin->timeRange = TSWINDOW_DESC_INITIALIZER; + break; + } else { + pJoin->timeRange.skey = TMAX(pJoin->timeRange.skey, pScan->scanRange.skey); + pJoin->timeRange.ekey = TMIN(pJoin->timeRange.ekey, pScan->scanRange.ekey); + } + } + } + + pCxt->optimized = true; + OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_STB_JOIN); + + return TSDB_CODE_SUCCESS; +} + + +static int32_t hashJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, hashJoinOptShouldBeOptimized); + if (NULL == pNode) { + return TSDB_CODE_SUCCESS; + } + + return hashJoinOptRewriteJoin(pCxt, pNode, pLogicSubplan); +} + static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode) || OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_STB_JOIN)) { return false; @@ -4858,7 +5082,7 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->pTagEqCond = nodesCloneNode(pOrigJoin->pTagEqCond); - pJoin->pFullOnCond = nodesCloneNode(pOrigJoin->pTagOnCond); + pJoin->pTagOnCond = nodesCloneNode(pOrigJoin->pTagOnCond); int32_t code = TSDB_CODE_SUCCESS; pJoin->node.pChildren = pChildren; @@ -5404,6 +5628,7 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "ScanPath", .optimizeFunc = scanPathOptimize}, {.pName = "PushDownCondition", .optimizeFunc = pdcOptimize}, {.pName = "JoinCondOptimize", .optimizeFunc = joinCondOptimize}, + {.pName = "HashJoin", .optimizeFunc = hashJoinOptimize}, {.pName = "StableJoin", .optimizeFunc = stableJoinOptimize}, {.pName = "GroupJoin", .optimizeFunc = groupJoinOptimize}, {.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize}, diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 8decf3d480..c584f7527e 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1144,6 +1144,8 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil pJoin->pWindowOffset = nodesCloneNode(pJoinLogicNode->pWindowOffset); pJoin->pJLimit = nodesCloneNode(pJoinLogicNode->pJLimit); pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; + pJoin->timeRange.skey = pJoinLogicNode->timeRange.skey; + pJoin->timeRange.ekey = pJoinLogicNode->timeRange.ekey; code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond); if (TSDB_CODE_SUCCESS == code) { @@ -1152,8 +1154,18 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pJoin->pTagEqCond); } - if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pFullOnCond) { - code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pFullOnCond, &pJoin->pFilterConditions); + if (TSDB_CODE_SUCCESS == code) { + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, -1, pJoinLogicNode->pLeftOnCond, &pJoin->pLeftOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = setNodeSlotId(pCxt, -1, pRightDesc->dataBlockId, pJoinLogicNode->pRightOnCond, &pJoin->pRightOnCond); + } + if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) { + code = mergeJoinConds(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond); + } + SNode* pOnCond = (NULL != pJoinLogicNode->pColOnCond) ? pJoinLogicNode->pColOnCond : pJoinLogicNode->pTagOnCond; + if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) { + code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pOnCond, &pJoin->pFullOnCond); } if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets); diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index a0e4f36e15..a7c21092c6 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -490,6 +490,19 @@ bool getSmallDataTsSortOptHint(SNodeList* pList) { return false; } +bool getHashJoinOptHint(SNodeList* pList) { + if (!pList) return false; + SNode* pNode; + FOREACH(pNode, pList) { + SHintNode* pHint = (SHintNode*)pNode; + if (pHint->option == HINT_HASH_JOIN) { + return true; + } + } + return false; +} + + int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SLogicNode* pCurr = (SLogicNode*)pNode; @@ -589,4 +602,14 @@ SFunctionNode* createGroupKeyAggFunc(SColumnNode* pGroupCol) { return pFunc; } +int32_t getTimeRangeFromNode(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* pIsStrict) { + SNode* pNew = NULL; + int32_t code = scalarCalculateConstants(*pPrimaryKeyCond, &pNew); + if (TSDB_CODE_SUCCESS == code) { + *pPrimaryKeyCond = pNew; + code = filterGetTimeRange(*pPrimaryKeyCond, pTimeRange, pIsStrict); + } + return code; +} +