diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index d2d58e3da6..d0ab8e7974 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -151,9 +151,10 @@ typedef struct SJoinLogicNode { bool hashJoinHint; // FOR HASH JOIN - STimeWindow timeRange; //table onCond filter - SNode* pLeftOnCond; //table onCond filter - SNode* pRightOnCond; //table onCond filter + int32_t timeRangeTarget; //table onCond filter + STimeWindow timeRange; //table onCond filter + SNode* pLeftOnCond; //table onCond filter + SNode* pRightOnCond; //table onCond filter } SJoinLogicNode; typedef struct SAggLogicNode { @@ -527,10 +528,15 @@ typedef struct SHashJoinPhysiNode { SNode* pJLimit; SNodeList* pOnLeft; SNodeList* pOnRight; - STimeWindow timeRange; //table onCond filter - SNode* pLeftOnCond; //table onCond filter - SNode* pRightOnCond; //table onCond filter - SNode* pFullOnCond; //preFilter + SNode* leftPrimExpr; + SNode* rightPrimExpr; + int32_t leftPrimSlotId; + int32_t rightPrimSlotId; + int32_t timeRangeTarget; //table onCond filter + STimeWindow timeRange; //table onCond filter + SNode* pLeftOnCond; //table onCond filter + SNode* pRightOnCond; //table onCond filter + SNode* pFullOnCond; //preFilter SNodeList* pTargets; SQueryStat inputStat[2]; diff --git a/include/libs/scalar/filter.h b/include/libs/scalar/filter.h index adabe6d67c..c1ce1e6fd8 100644 --- a/include/libs/scalar/filter.h +++ b/include/libs/scalar/filter.h @@ -31,6 +31,15 @@ enum { FLT_OPTION_NEED_UNIQE = 4, }; + +typedef enum EConditionType { + COND_TYPE_PRIMARY_KEY = 1, + COND_TYPE_TAG_INDEX, + COND_TYPE_TAG, + COND_TYPE_NORMAL +} EConditionType; + + #define FILTER_RESULT_ALL_QUALIFIED 0x1 #define FILTER_RESULT_NONE_QUALIFIED 0x2 #define FILTER_RESULT_PARTIAL_QUALIFIED 0x3 @@ -54,6 +63,8 @@ extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pColsAgg, /* condition split interface */ int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond, SNode **pOtherCond); +bool filterIsMultiTableColsCond(SNode *pCond); +EConditionType filterClassifyCondition(SNode *pNode); #ifdef __cplusplus } diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index 3178515706..a583f21117 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -78,6 +78,7 @@ extern "C" { #define EXPLAIN_JOIN_EQ_RIGHT_FORMAT "Right Equal Cond: " #define EXPLAIN_COUNT_NUM_FORMAT "Window Count=%" PRId64 #define EXPLAIN_COUNT_SLIDING_FORMAT "Window Sliding=%" PRId64 +#define EXPLAIN_TABLE_TIMERANGE_FORMAT "%s Table Time Range: [%" PRId64 ", %" PRId64 "]" #define EXPLAIN_PLANNING_TIME_FORMAT "Planning Time: %.3f ms" #define EXPLAIN_EXEC_TIME_FORMAT "Execution Time: %.3f ms" diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index fbe77b7987..5afc629865 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -61,6 +61,15 @@ char* qExplainGetAsofOpStr(int32_t opType) { } } +char* qExplainGetTimerangeTargetStr(int32_t target) { + static char* targetName[] = {"", "Left", "Right", "Left/Right"}; + if (target <= 0 || target > 3) { + return "Unknown"; + } + + return targetName[target]; +} + void qExplainFreeResNode(SExplainResNode *resNode) { if (NULL == resNode) { @@ -1700,19 +1709,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - if (pJoinNode->node.pConditions || pJoinNode->pFullOnCond) { + if (pJoinNode->node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - if (pJoinNode->node.pConditions) { - QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, - TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - } - if (pJoinNode->pFullOnCond) { - if (pJoinNode->node.pConditions) { - EXPLAIN_ROW_APPEND(" AND "); - } - QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pFullOnCond, tbuf + VARSTR_HEADER_SIZE, - TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - } + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, + TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } @@ -1740,8 +1741,21 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i nodesNodeToSQL(pJoinNode->pTagEqCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); conditionsGot = true; } + if (pJoinNode->pFullOnCond) { + if (conditionsGot) { + EXPLAIN_ROW_APPEND(" AND "); + } + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pFullOnCond, tbuf + VARSTR_HEADER_SIZE, + TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + } EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + if (pJoinNode->timeRangeTarget) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TABLE_TIMERANGE_FORMAT, qExplainGetTimerangeTargetStr(pJoinNode->timeRangeTarget), pJoinNode->timeRange.skey, pJoinNode->timeRange.ekey); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } } break; } diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index 78fea8b792..1085f2236c 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -25,6 +25,9 @@ extern "C" { #define HJOIN_ROW_BITMAP_SIZE (2 * 1048576) #define HJOIN_BLK_THRESHOLD_RATIO 0.9 +typedef int32_t (*hJoinImplFp)(SOperatorInfo*); + + #pragma pack(push, 1) typedef struct SBufRowInfo { void* next; @@ -33,12 +36,24 @@ typedef struct SBufRowInfo { } SBufRowInfo; #pragma pack(pop) +typedef enum EHJoinPhase { + E_JOIN_PHASE_PRE = 1, + E_JOIN_PHASE_CUR, + E_JOIN_PHASE_POST +} EHJoinPhase; + typedef struct SHJoinCtx { bool rowRemains; + bool midRemains; int64_t limit; SBufRowInfo* pBuildRow; SSDataBlock* pProbeData; - int32_t probeIdx; + EHJoinPhase probePhase; + int32_t probePreIdx; + int32_t probeStartIdx; + int32_t probeEndIdx; + int32_t probePostIdx; + bool readMatch; } SHJoinCtx; typedef struct SHJoinColInfo { @@ -64,11 +79,31 @@ typedef struct SGroupData { SBufRowInfo* rows; } SGroupData; -typedef struct SHJoinTableInfo { + +typedef struct SHJoinColMap { + int32_t srcSlot; + int32_t dstSlot; + bool vardata; + int32_t bytes; +} SHJoinColMap; + +// for now timetruncate only +typedef struct SHJoinPrimExprCtx { + int64_t truncateUnit; + int64_t timezoneUnit; + int32_t targetSlotId; +} SHJoinPrimExprCtx; + +typedef struct SHJoinTableCtx { int32_t downStreamIdx; SOperatorInfo* downStream; int32_t blkId; SQueryStat inputStat; + bool hasTimeRange; + + SHJoinColMap* primCol; + SNode* primExpr; + SHJoinPrimExprCtx primCtx; int32_t keyNum; SHJoinColInfo* keyCols; @@ -82,7 +117,7 @@ typedef struct SHJoinTableInfo { int32_t valBufSize; SArray* valVarCols; bool valColExist; -} SHJoinTableInfo; +} SHJoinTableCtx; typedef struct SHJoinExecInfo { int64_t buildBlkNum; @@ -95,14 +130,16 @@ typedef struct SHJoinExecInfo { typedef struct SHJoinOperatorInfo { - int32_t joinType; - SHJoinTableInfo tbs[2]; - SHJoinTableInfo* pBuild; - SHJoinTableInfo* pProbe; + EJoinType joinType; + EJoinSubType subType; + SHJoinTableCtx tbs[2]; + SHJoinTableCtx* pBuild; + SHJoinTableCtx* pProbe; SFilterInfo* pPreFilter; SFilterInfo* pFinFilter; SSDataBlock* finBlk; SSDataBlock* midBlk; + STimeWindow tblTimeRange; int32_t pResColNum; int8_t* pResColMap; SArray* pRowBufs; @@ -111,6 +148,7 @@ typedef struct SHJoinOperatorInfo { SHJoinCtx ctx; SHJoinExecInfo execInfo; int32_t blkThreshold; + hJoinImplFp joinFp; } SHJoinOperatorInfo; @@ -132,6 +170,16 @@ typedef struct SHJoinOperatorInfo { } \ } while (0) +int32_t hInnerJoinDo(struct SOperatorInfo* pOperator); +int32_t hLeftJoinDo(struct SOperatorInfo* pOperator); +void hJoinSetDone(struct SOperatorInfo* pOperator); +void hJoinAppendResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched); +bool hJoinCopyKeyColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen); +int32_t hJoinCopyMergeMidBlk(SHJoinCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin); +int32_t hJoinHandleMidRemains(SHJoinOperatorInfo* pJoin, SHJoinCtx* pCtx); +bool hJoinBlkReachThreshold(SHJoinOperatorInfo* pInfo, int64_t blkRows); +int32_t hJoinCopyNMatchRowsToBlock(SHJoinOperatorInfo* pJoin, SSDataBlock* pRes, int32_t startIdx, int32_t rows); + #ifdef __cplusplus } diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 5f851af0dc..f6f1cf26f3 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -30,6 +30,7 @@ extern "C" { #define MJOIN_BLK_SIZE_LIMIT 10485760 #define MJOIN_ROW_BITMAP_SIZE (2 * 1048576) #endif +#define MJOIN_SEMI_ANTI_BLK_ROWS_NUM 64 #define MJOIN_BLK_THRESHOLD_RATIO 0.9 struct SMJoinOperatorInfo; diff --git a/source/libs/executor/src/hashjoin.c b/source/libs/executor/src/hashjoin.c index cfcc8ece50..c22b331a16 100755 --- a/source/libs/executor/src/hashjoin.c +++ b/source/libs/executor/src/hashjoin.c @@ -29,3 +29,321 @@ +int32_t hInnerJoinDo(struct SOperatorInfo* pOperator) { + SHJoinOperatorInfo* pJoin = pOperator->info; + SHJoinTableCtx* pProbe = pJoin->pProbe; + SHJoinCtx* pCtx = &pJoin->ctx; + SSDataBlock* pRes = pJoin->finBlk; + size_t bufLen = 0; + int32_t code = 0; + bool allFetched = false; + + if (pJoin->ctx.pBuildRow) { + hJoinAppendResToBlock(pOperator, pRes, &allFetched); + if (pRes->info.rows >= pRes->info.capacity) { + if (allFetched) { + ++pCtx->probeStartIdx; + } + + return code; + } else { + ++pCtx->probeStartIdx; + } + } + + for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) { + if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) { + continue; + } + + SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); +/* + size_t keySize = 0; + int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize); + ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen)); + int64_t rows = getSingleKeyRowsNum(pGroup->rows); + pJoin->execInfo.expectRows += rows; + qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows); +*/ + if (pGroup) { + pCtx->pBuildRow = pGroup->rows; + hJoinAppendResToBlock(pOperator, pRes, &allFetched); + if (pRes->info.rows >= pRes->info.capacity) { + if (allFetched) { + ++pCtx->probeStartIdx; + } + + return code; + } + } + } + + pCtx->rowRemains = false; + + return code; +} + +int32_t hLeftJoinHandleSeqRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) { + bool allFetched = false; + SHJoinCtx* pCtx = &pJoin->ctx; + + while (!allFetched) { + hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched); + if (pJoin->midBlk->info.rows > 0) { + doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL); + if (pJoin->midBlk->info.rows > 0) { + pCtx->readMatch = true; + HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk)); + + if (pCtx->midRemains) { + if (allFetched) { + ++pCtx->probeStartIdx; + } + + *loopCont = false; + return TSDB_CODE_SUCCESS; + } + } + } + + if (allFetched && !pCtx->readMatch) { + HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1)); + } + + if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) { + if (allFetched) { + ++pCtx->probeStartIdx; + } + + *loopCont = false; + return TSDB_CODE_SUCCESS; + } + } + + ++pCtx->probeStartIdx; + *loopCont = true; + + return TSDB_CODE_SUCCESS; +} + +int32_t hLeftJoinHandleSeqProbeRows(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) { + SHJoinTableCtx* pProbe = pJoin->pProbe; + SHJoinCtx* pCtx = &pJoin->ctx; + size_t bufLen = 0; + bool allFetched = false; + + if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) { + return TSDB_CODE_SUCCESS; + } + + for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) { + if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) { + continue; + } + + SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); +/* + size_t keySize = 0; + int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize); + ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen)); + int64_t rows = getSingleKeyRowsNum(pGroup->rows); + pJoin->execInfo.expectRows += rows; + qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows); +*/ + + if (NULL == pGroup) { + HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1)); + if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) { + ++pCtx->probeStartIdx; + *loopCont = false; + + return TSDB_CODE_SUCCESS; + } + + continue; + } + + pCtx->readMatch = false; + pCtx->pBuildRow = pGroup->rows; + allFetched = false; + + while (!allFetched) { + hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched); + if (pJoin->midBlk->info.rows > 0) { + doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL); + if (pJoin->midBlk->info.rows > 0) { + pCtx->readMatch = true; + HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk)); + + if (pCtx->midRemains) { + if (allFetched) { + ++pCtx->probeStartIdx; + } + + *loopCont = false; + + return TSDB_CODE_SUCCESS; + } + } + } + + if (allFetched && !pCtx->readMatch) { + HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1)); + } + + if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) { + if (allFetched) { + ++pCtx->probeStartIdx; + } + + *loopCont = false; + + return TSDB_CODE_SUCCESS; + } + } + } + + pCtx->probePhase = E_JOIN_PHASE_POST; + *loopCont = true; + + return TSDB_CODE_SUCCESS; +} + + +int32_t hLeftJoinHandleRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) { + bool allFetched = false; + SHJoinCtx* pCtx = &pJoin->ctx; + + hJoinAppendResToBlock(pOperator, pJoin->finBlk, &allFetched); + + if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) { + if (allFetched) { + ++pCtx->probeStartIdx; + } + + *loopCont = false; + return TSDB_CODE_SUCCESS; + } else { + ++pCtx->probeStartIdx; + } + + *loopCont = true; + + return TSDB_CODE_SUCCESS; +} + + +int32_t hLeftJoinHandleProbeRows(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) { + SHJoinTableCtx* pProbe = pJoin->pProbe; + SHJoinCtx* pCtx = &pJoin->ctx; + size_t bufLen = 0; + bool allFetched = false; + + for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) { + if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) { + continue; + } + + SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); +/* + size_t keySize = 0; + int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize); + ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen)); + int64_t rows = getSingleKeyRowsNum(pGroup->rows); + pJoin->execInfo.expectRows += rows; + qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows); +*/ + + if (NULL == pGroup) { + HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1)); + if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) { + ++pCtx->probeStartIdx; + *loopCont = false; + + return TSDB_CODE_SUCCESS; + } + + continue; + } + + pCtx->pBuildRow = pGroup->rows; + + hJoinAppendResToBlock(pOperator, pJoin->finBlk, &allFetched); + if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) { + if (allFetched) { + ++pCtx->probeStartIdx; + } + *loopCont = false; + + return TSDB_CODE_SUCCESS; + } + } + + pCtx->probePhase = E_JOIN_PHASE_POST; + *loopCont = true; + + return TSDB_CODE_SUCCESS; +} + + + +int32_t hLeftJoinDo(struct SOperatorInfo* pOperator) { + SHJoinOperatorInfo* pJoin = pOperator->info; + SHJoinCtx* pCtx = &pJoin->ctx; + + while (pCtx->rowRemains) { + switch (pCtx->probePhase) { + case E_JOIN_PHASE_PRE: { + int32_t rows = pCtx->probeStartIdx - pCtx->probePreIdx; + int32_t rowsLeft = pJoin->finBlk->info.capacity - pJoin->finBlk->info.rows; + if (rows <= rowsLeft) { + HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, 0, rows)); + pCtx->probePhase = E_JOIN_PHASE_CUR; + } else { + HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, 0, rowsLeft)); + pJoin->ctx.probePreIdx += rowsLeft; + + return TSDB_CODE_SUCCESS; + } + break; + } + case E_JOIN_PHASE_CUR: { + bool loopCont = false; + if (NULL == pJoin->ctx.pBuildRow) { + HJ_ERR_RET(pJoin->pPreFilter ? hLeftJoinHandleSeqProbeRows(pOperator, pJoin, &loopCont) : hLeftJoinHandleProbeRows(pOperator, pJoin, &loopCont)); + } else { + HJ_ERR_RET(pJoin->pPreFilter ? hLeftJoinHandleSeqRowRemains(pOperator, pJoin, &loopCont) : hLeftJoinHandleRowRemains(pOperator, pJoin, &loopCont)); + } + + if (!loopCont) { + return TSDB_CODE_SUCCESS; + } + break; + } + case E_JOIN_PHASE_POST: { + if (pCtx->probeEndIdx < (pCtx->pProbeData->info.rows - 1) && pCtx->probePostIdx <= (pCtx->pProbeData->info.rows - 1)) { + int32_t rowsLeft = pJoin->finBlk->info.capacity - pJoin->finBlk->info.rows; + int32_t rows = pCtx->pProbeData->info.rows - pCtx->probePostIdx; + if (rows <= rowsLeft) { + HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pJoin->ctx.probePostIdx, rows)); + pCtx->rowRemains = false; + } else { + HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pJoin->ctx.probePostIdx, rowsLeft)); + pCtx->probePostIdx += rowsLeft; + + return TSDB_CODE_SUCCESS; + } + } else { + pJoin->ctx.rowRemains = false; + } + break; + } + default: + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + } + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 610c4be61a..6e3a929df5 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -26,6 +26,106 @@ #include "tmsg.h" #include "ttypes.h" #include "hashjoin.h" +#include "functionMgt.h" + + +bool hJoinBlkReachThreshold(SHJoinOperatorInfo* pInfo, int64_t blkRows) { + if (INT64_MAX == pInfo->ctx.limit || pInfo->pFinFilter != NULL) { + return blkRows >= pInfo->blkThreshold; + } + + return (pInfo->execInfo.resRows + blkRows) >= pInfo->ctx.limit; +} + +int32_t hJoinHandleMidRemains(SHJoinOperatorInfo* pJoin, SHJoinCtx* pCtx) { + ASSERT(0 < pJoin->midBlk->info.rows); + + TSWAP(pJoin->midBlk, pJoin->finBlk); + + pCtx->midRemains = false; + + return TSDB_CODE_SUCCESS; +} + +int32_t hJoinCopyMergeMidBlk(SHJoinCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) { + SSDataBlock* pLess = *ppMid; + SSDataBlock* pMore = *ppFin; + +/* + if ((*ppMid)->info.rows < (*ppFin)->info.rows) { + pLess = (*ppMid); + pMore = (*ppFin); + } else { + pLess = (*ppFin); + pMore = (*ppMid); + } +*/ + + int32_t totalRows = pMore->info.rows + pLess->info.rows; + if (totalRows <= pMore->info.capacity) { + HJ_ERR_RET(blockDataMerge(pMore, pLess)); + blockDataCleanup(pLess); + pCtx->midRemains = false; + } else { + int32_t copyRows = pMore->info.capacity - pMore->info.rows; + HJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows)); + blockDataShrinkNRows(pLess, copyRows); + pCtx->midRemains = true; + } + +/* + if (pMore != (*ppFin)) { + TSWAP(*ppMid, *ppFin); + } +*/ + + return TSDB_CODE_SUCCESS; +} + +int32_t hJoinSetImplFp(SHJoinOperatorInfo* pJoin) { + switch (pJoin->joinType) { + case JOIN_TYPE_INNER: + pJoin->joinFp = hInnerJoinDo; + break; + case JOIN_TYPE_LEFT: + case JOIN_TYPE_RIGHT: { + switch (pJoin->subType) { + case JOIN_STYPE_OUTER: + pJoin->joinFp = hLeftJoinDo; + break; + default: + break; + } + break; + } + default: + break; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t hJoinLaunchPrimExpr(SSDataBlock* pBlock, SHJoinTableCtx* pTable, int32_t startIdx, int32_t endIdx) { + if (NULL == pTable->primExpr) { + return TSDB_CODE_SUCCESS; + } + + SHJoinPrimExprCtx* pCtx = &pTable->primCtx; + SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot); + SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId); + if (0 != pCtx->timezoneUnit) { + for (int32_t i = startIdx; i <= endIdx; ++i) { + ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit - pCtx->timezoneUnit; + } + } else { + for (int32_t i = startIdx; i <= endIdx; ++i) { + ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit; + } + } + + return TSDB_CODE_SUCCESS; +} static int64_t hJoinGetSingleKeyRowsNum(SBufRowInfo* pRow) { @@ -52,7 +152,7 @@ static int64_t hJoinGetRowsNumOfKeyHash(SSHashObj* pHash) { return rowsNum; } -static int32_t hJoinInitKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { +static int32_t hJoinInitKeyColsInfo(SHJoinTableCtx* pTable, SNodeList* pList) { pTable->keyNum = LIST_LENGTH(pList); pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SHJoinColInfo)); @@ -106,7 +206,7 @@ static bool hJoinIsValColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo return false; } -static int32_t hJoinInitValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { +static int32_t hJoinInitValColsInfo(SHJoinTableCtx* pTable, SNodeList* pList) { hJoinGetValColsNum(pList, pTable->blkId, &pTable->valNum); if (pTable->valNum == 0) { return TSDB_CODE_SUCCESS; @@ -157,17 +257,71 @@ static int32_t hJoinInitValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { return TSDB_CODE_SUCCESS; } +static int32_t hJoinInitPrimKeyInfo(SHJoinTableCtx* pTable, int32_t slotId) { + pTable->primCol = taosMemoryMalloc(sizeof(SHJoinColMap)); + if (NULL == pTable->primCol) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pTable->primCol->srcSlot = slotId; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t hJoinInitPrimExprCtx(SNode* pNode, SHJoinPrimExprCtx* pCtx, SHJoinTableCtx* pTable) { + if (NULL == pNode) { + pCtx->targetSlotId = pTable->primCol->srcSlot; + return TSDB_CODE_SUCCESS; + } + + if (QUERY_NODE_TARGET != nodeType(pNode)) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + STargetNode* pTarget = (STargetNode*)pNode; + if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr)) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr; + if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + if (4 != pFunc->pParameterList->length && 5 != pFunc->pParameterList->length) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); + SValueNode* pCurrTz = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2) : NULL; + SValueNode* pTimeZone = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4) : (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3); + + pCtx->truncateUnit = pUnit->typeData; + if ((NULL == pCurrTz || 1 == pCurrTz->typeData) && pCtx->truncateUnit >= (86400 * TSDB_TICK_PER_SECOND(pFunc->node.resType.precision))) { + pCtx->timezoneUnit = offsetFromTz(varDataVal(pTimeZone->datum.p), TSDB_TICK_PER_SECOND(pFunc->node.resType.precision)); + } + + pCtx->targetSlotId = pTarget->slotId; + + return TSDB_CODE_SUCCESS; +} + static int32_t hJoinInitTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { SNodeList* pKeyList = NULL; - SHJoinTableInfo* pTable = &pJoin->tbs[idx]; + SHJoinTableCtx* pTable = &pJoin->tbs[idx]; pTable->downStream = pDownstream[idx]; pTable->blkId = pDownstream[idx]->resultDataBlockId; if (0 == idx) { pKeyList = pJoinNode->pOnLeft; + pTable->hasTimeRange = pJoinNode->timeRangeTarget & 0x1; } else { pKeyList = pJoinNode->pOnRight; + pTable->hasTimeRange = pJoinNode->timeRangeTarget & 0x2; } + + HJ_ERR_RET(hJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId)); int32_t code = hJoinInitKeyColsInfo(pTable, pKeyList); if (code) { @@ -180,6 +334,8 @@ static int32_t hJoinInitTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* memcpy(&pTable->inputStat, pStat, sizeof(*pStat)); + HJ_ERR_RET(hJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable)); + return TSDB_CODE_SUCCESS; } @@ -188,9 +344,11 @@ static void hJoinSetBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysi int32_t probeIdx = 1; pInfo->joinType = pJoinNode->joinType; + pInfo->subType = pJoinNode->subType; switch (pInfo->joinType) { case JOIN_TYPE_INNER: + case JOIN_TYPE_FULL: if (pInfo->tbs[0].inputStat.inputRowNum <= pInfo->tbs[1].inputStat.inputRowNum) { buildIdx = 0; probeIdx = 1; @@ -216,6 +374,14 @@ static void hJoinSetBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysi pInfo->pBuild->downStreamIdx = buildIdx; pInfo->pProbe->downStreamIdx = probeIdx; + + if (0 == buildIdx) { + pInfo->pBuild->primExpr = pJoinNode->leftPrimExpr; + pInfo->pProbe->primExpr = pJoinNode->rightPrimExpr; + } else { + pInfo->pBuild->primExpr = pJoinNode->rightPrimExpr; + pInfo->pProbe->primExpr = pJoinNode->leftPrimExpr; + } } static int32_t hJoinBuildResColsMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { @@ -263,11 +429,12 @@ static int32_t hJoinInitBufPages(SHJoinOperatorInfo* pInfo) { return hJoinAddPageToBufs(pInfo->pRowBufs); } -static void hJoinFreeTableInfo(SHJoinTableInfo* pTable) { +static void hJoinFreeTableInfo(SHJoinTableCtx* pTable) { taosMemoryFreeClear(pTable->keyCols); taosMemoryFreeClear(pTable->keyBuf); taosMemoryFreeClear(pTable->valCols); taosArrayDestroy(pTable->valVarCols); + taosMemoryFree(pTable->primCol); } static void hJoinFreeBufPage(void* param) { @@ -305,9 +472,9 @@ static FORCE_INLINE char* hJoinRetrieveColDataFromRowBufs(SArray* pRowBufs, SBuf return pPage->data + pRow->offset; } -static FORCE_INLINE int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { - SHJoinTableInfo* pBuild = pJoin->pBuild; - SHJoinTableInfo* pProbe = pJoin->pProbe; +static int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { + SHJoinTableCtx* pBuild = pJoin->pBuild; + SHJoinTableCtx* pProbe = pJoin->pProbe; int32_t buildIdx = 0, buildValIdx = 0; int32_t probeIdx = 0; SBufRowInfo* pRow = pStart; @@ -347,7 +514,7 @@ static FORCE_INLINE int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, i SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot); - code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeIdx)); + code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeStartIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeStartIdx)); if (code) { return code; } @@ -360,8 +527,37 @@ static FORCE_INLINE int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, i return TSDB_CODE_SUCCESS; } +int32_t hJoinCopyNMatchRowsToBlock(SHJoinOperatorInfo* pJoin, SSDataBlock* pRes, int32_t startIdx, int32_t rows) { + SHJoinTableCtx* pBuild = pJoin->pBuild; + SHJoinTableCtx* pProbe = pJoin->pProbe; + int32_t buildIdx = 0; + int32_t probeIdx = 0; + int32_t code = 0; -static FORCE_INLINE void hJoinAppendResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched) { + for (int32_t i = 0; i < pJoin->pResColNum; ++i) { + if (pJoin->pResColMap[i]) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot); + colDataSetNItemsNull(pDst, pRes->info.rows, rows); + + buildIdx++; + } else { + SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot); + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot); + + colDataAssignNRows(pDst, pRes->info.rows, pSrc, startIdx, rows); + + probeIdx++; + } + } + + pRes->info.rows += rows; + + return TSDB_CODE_SUCCESS; +} + + + +void hJoinAppendResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched) { SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinCtx* pCtx = &pJoin->ctx; SBufRowInfo* pStart = pCtx->pBuildRow; @@ -387,7 +583,7 @@ static FORCE_INLINE void hJoinAppendResToBlock(struct SOperatorInfo* pOperator, } -static FORCE_INLINE bool hJoinCopyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { +bool hJoinCopyKeyColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen) { char *pData = NULL; size_t bufLen = 0; @@ -428,61 +624,7 @@ static FORCE_INLINE bool hJoinCopyKeyColsDataToBuf(SHJoinTableInfo* pTable, int3 return false; } - -static void doHashJoinImpl(struct SOperatorInfo* pOperator) { - SHJoinOperatorInfo* pJoin = pOperator->info; - SHJoinTableInfo* pProbe = pJoin->pProbe; - SHJoinCtx* pCtx = &pJoin->ctx; - SSDataBlock* pRes = pJoin->finBlk; - size_t bufLen = 0; - bool allFetched = false; - - if (pJoin->ctx.pBuildRow) { - hJoinAppendResToBlock(pOperator, pRes, &allFetched); - if (pRes->info.rows >= pRes->info.capacity) { - if (allFetched) { - ++pCtx->probeIdx; - } - - return; - } else { - ++pCtx->probeIdx; - } - } - - for (; pCtx->probeIdx < pCtx->pProbeData->info.rows; ++pCtx->probeIdx) { - if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeIdx, &bufLen)) { - continue; - } - - SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); -/* - size_t keySize = 0; - int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize); - ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen)); - int64_t rows = getSingleKeyRowsNum(pGroup->rows); - pJoin->execInfo.expectRows += rows; - qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows); -*/ - if (pGroup) { - pCtx->pBuildRow = pGroup->rows; - hJoinAppendResToBlock(pOperator, pRes, &allFetched); - if (pRes->info.rows >= pRes->info.capacity) { - if (allFetched) { - ++pCtx->probeIdx; - } - - return; - } - } else { - qTrace("no key matched"); - } - } - - pCtx->rowRemains = false; -} - -static int32_t hJoinSetKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { +static int32_t hJoinSetKeyColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable) { for (int32_t i = 0; i < pTable->keyNum; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { @@ -503,7 +645,7 @@ static int32_t hJoinSetKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) return TSDB_CODE_SUCCESS; } -static int32_t hJoinSetValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { +static int32_t hJoinSetValColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable) { if (!pTable->valColExist) { return TSDB_CODE_SUCCESS; } @@ -534,7 +676,7 @@ static int32_t hJoinSetValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) -static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) { +static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx) { if (!pTable->valColExist) { return; } @@ -596,7 +738,7 @@ static FORCE_INLINE int32_t hJoinGetValBufFromPages(SArray* pPages, int32_t bufS } while (true); } -static FORCE_INLINE int32_t hJoinGetValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) { +static FORCE_INLINE int32_t hJoinGetValBufSize(SHJoinTableCtx* pTable, int32_t rowIdx) { if (NULL == pTable->valVarCols) { return pTable->valBufSize; } @@ -614,7 +756,7 @@ static FORCE_INLINE int32_t hJoinGetValBufSize(SHJoinTableInfo* pTable, int32_t } -static int32_t hJoinAddRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableInfo* pTable, size_t keyLen, int32_t rowIdx) { +static int32_t hJoinAddRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableCtx* pTable, size_t keyLen, int32_t rowIdx) { SGroupData group = {0}; SBufRowInfo* pRow = NULL; @@ -652,7 +794,7 @@ static int32_t hJoinAddRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGro } static int32_t hJoinAddRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) { - SHJoinTableInfo* pBuild = pJoin->pBuild; + SHJoinTableCtx* pBuild = pJoin->pBuild; int32_t code = hJoinSetValColsData(pBlock, pBuild); if (code) { return code; @@ -669,15 +811,64 @@ static int32_t hJoinAddRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, return TSDB_CODE_SUCCESS; } +static bool hJoinFilterTimeRange(SSDataBlock* pBlock, STimeWindow* pRange, int32_t primSlot, int32_t* startIdx, int32_t* endIdx) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, primSlot); + if (NULL == pCol) { + qError("hash join can't get prim col, slot:%d, slotNum:%d", primSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock)); + return false; + } + + TSKEY skey = *(TSKEY*)colDataGetData(pCol, 0); + TSKEY ekey = *(TSKEY*)colDataGetData(pCol, (pBlock->info.rows - 1)); + + if (ekey < pRange->skey || skey > pRange->ekey) { + return false; + } + + if (skey >= pRange->skey && ekey <= pRange->ekey) { + *startIdx = 0; + *endIdx = pBlock->info.rows - 1; + return true; + } + + if (skey < pRange->skey && ekey > pRange->ekey) { + TSKEY *pStart = (TSKEY*)taosbsearch(&pRange->skey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_GE); + TSKEY *pEnd = (TSKEY*)taosbsearch(&pRange->ekey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_LE); + *startIdx = ((uint64_t)pStart - (uint64_t)pCol->pData) / sizeof(int64_t); + *endIdx = ((uint64_t)pEnd - (uint64_t)pCol->pData) / sizeof(int64_t); + return true; + } + + if (skey >= pRange->skey) { + TSKEY *pEnd = (TSKEY*)taosbsearch(&pRange->ekey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_LE); + *startIdx = 0; + *endIdx = ((uint64_t)pEnd - (uint64_t)pCol->pData) / sizeof(int64_t); + return true; + } + + TSKEY *pStart = (TSKEY*)taosbsearch(&pRange->skey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_GE); + *startIdx = ((uint64_t)pStart - (uint64_t)pCol->pData) / sizeof(int64_t); + *endIdx = pBlock->info.rows - 1; + + return true; +} + static int32_t hJoinAddBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { - SHJoinTableInfo* pBuild = pJoin->pBuild; + SHJoinTableCtx* pBuild = pJoin->pBuild; + int32_t startIdx = 0, endIdx = pBlock->info.rows - 1; + if (pBuild->hasTimeRange && !hJoinFilterTimeRange(pBlock, &pJoin->tblTimeRange, pBuild->primCol->srcSlot, &startIdx, &endIdx)) { + return TSDB_CODE_SUCCESS; + } + + HJ_ERR_RET(hJoinLaunchPrimExpr(pBlock, pBuild, startIdx, endIdx)); + int32_t code = hJoinSetKeyColsData(pBlock, pBuild); if (code) { return code; } size_t bufLen = 0; - for (int32_t i = 0; i < pBlock->info.rows; ++i) { + for (int32_t i = startIdx; i <= endIdx; ++i) { if (hJoinCopyKeyColsDataToBuf(pBuild, i, &bufLen)) { continue; } @@ -690,7 +881,7 @@ static int32_t hJoinAddBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* return code; } -static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator) { +static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator, bool* queryDone) { SHJoinOperatorInfo* pJoin = pOperator->info; SSDataBlock* pBlock = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -710,12 +901,36 @@ static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator) { } } + if (IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType) && tSimpleHashGetSize(pJoin->pKeyHash) <= 0) { + hJoinSetDone(pOperator); + *queryDone = true; + } + + qTrace("build table rows:%" PRId64, hJoinGetRowsNumOfKeyHash(pJoin->pKeyHash)); + return TSDB_CODE_SUCCESS; } static int32_t hJoinPrepareStart(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { SHJoinOperatorInfo* pJoin = pOperator->info; - SHJoinTableInfo* pProbe = pJoin->pProbe; + SHJoinTableCtx* pProbe = pJoin->pProbe; + int32_t startIdx = 0, endIdx = pBlock->info.rows - 1; + if (pProbe->hasTimeRange && !hJoinFilterTimeRange(pBlock, &pJoin->tblTimeRange, pProbe->primCol->srcSlot, &startIdx, &endIdx)) { + if (!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) { + pJoin->ctx.probeEndIdx = -1; + pJoin->ctx.probePostIdx = 0; + pJoin->ctx.pProbeData = pBlock; + pJoin->ctx.rowRemains = true; + pJoin->ctx.probePhase = E_JOIN_PHASE_POST; + + HJ_ERR_RET((*pJoin->joinFp)(pOperator)); + } + + return TSDB_CODE_SUCCESS; + } + + HJ_ERR_RET(hJoinLaunchPrimExpr(pBlock, pProbe, startIdx, endIdx)); + int32_t code = hJoinSetKeyColsData(pBlock, pProbe); if (code) { return code; @@ -725,17 +940,26 @@ static int32_t hJoinPrepareStart(struct SOperatorInfo* pOperator, SSDataBlock* p return code; } - pJoin->ctx.probeIdx = 0; + pJoin->ctx.probeStartIdx = startIdx; + pJoin->ctx.probeEndIdx = endIdx; pJoin->ctx.pBuildRow = NULL; pJoin->ctx.pProbeData = pBlock; pJoin->ctx.rowRemains = true; + pJoin->ctx.probePreIdx = 0; + pJoin->ctx.probePostIdx = endIdx + 1; - doHashJoinImpl(pOperator); + if (!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType) && startIdx > 0) { + pJoin->ctx.probePhase = E_JOIN_PHASE_PRE; + } else { + pJoin->ctx.probePhase = E_JOIN_PHASE_CUR; + } + + HJ_ERR_RET((*pJoin->joinFp)(pOperator)); return TSDB_CODE_SUCCESS; } -static void hJoinSetDone(struct SOperatorInfo* pOperator) { +void hJoinSetDone(struct SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); SHJoinOperatorInfo* pInfo = pOperator->info; @@ -749,7 +973,6 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = TSDB_CODE_SUCCESS; SSDataBlock* pRes = pJoin->finBlk; - pRes->info.rows = 0; int64_t st = 0; if (pOperator->cost.openCost == 0) { @@ -757,30 +980,35 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { } if (pOperator->status == OP_EXEC_DONE) { + pRes->info.rows = 0; goto _return; } if (!pJoin->keyHashBuilt) { pJoin->keyHashBuilt = true; - - code = hJoinBuildHash(pOperator); + + bool queryDone = false; + code = hJoinBuildHash(pOperator, &queryDone); if (code) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } - if (tSimpleHashGetSize(pJoin->pKeyHash) <= 0) { - hJoinSetDone(pOperator); + if (queryDone) { goto _return; } - - //qTrace("build table rows:%" PRId64, getRowsNumOfKeyHash(pJoin->pKeyHash)); } + blockDataCleanup(pRes); + if (pJoin->ctx.rowRemains) { - doHashJoinImpl(pOperator); + code = (*pJoin->joinFp)(pOperator); + if (code) { + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } - if (pRes->info.rows >= pRes->info.capacity && pJoin->pFinFilter != NULL) { + if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) { doFilter(pRes, pJoin->pFinFilter, NULL); } if (pRes->info.rows > 0) { @@ -804,13 +1032,14 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - if (pRes->info.rows < pJoin->blkThreshold) { + if (!hJoinBlkReachThreshold(pJoin, pRes->info.rows)) { continue; } - if (pJoin->pFinFilter != NULL) { + if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) { doFilter(pRes, pJoin->pFinFilter, NULL); } + if (pRes->info.rows > 0) { break; } @@ -912,42 +1141,39 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n int32_t code = TSDB_CODE_SUCCESS; if (pOperator == NULL || pInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + goto _return; } + pInfo->tblTimeRange.skey = pJoinNode->timeRange.skey; + pInfo->tblTimeRange.ekey = pJoinNode->timeRange.ekey; + pInfo->ctx.limit = pJoinNode->node.pLimit ? ((SLimitNode*)pJoinNode->node.pLimit)->limit : INT64_MAX; - hJoinInitResBlocks(pInfo, pJoinNode); - setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); hJoinSetBuildAndProbeTable(pInfo, pJoinNode); - code = hJoinBuildResColsMap(pInfo, pJoinNode); - if (code) { - goto _error; - } + + HJ_ERR_JRET(hJoinBuildResColsMap(pInfo, pJoinNode)); - code = hJoinInitBufPages(pInfo); - if (code) { - goto _error; - } + HJ_ERR_JRET(hJoinInitBufPages(pInfo)); size_t hashCap = pInfo->pBuild->inputStat.inputRowNum > 0 ? (pInfo->pBuild->inputStat.inputRowNum * 1.5) : 1024; pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); if (pInfo->pKeyHash == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + goto _return; } HJ_ERR_JRET(hJoinHandleConds(pInfo, pJoinNode)); - code = appendDownstream(pOperator, pDownstream, numOfDownstream); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + HJ_ERR_JRET(hJoinInitResBlocks(pInfo, pJoinNode)); + + HJ_ERR_JRET(hJoinSetImplFp(pInfo)); + + HJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream)); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hJoinMainProcess, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); @@ -955,7 +1181,7 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n return pOperator; -_error: +_return: if (pInfo != NULL) { destroyHashJoinOperator(pInfo); } diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index a17fc457e1..2ba1a5d376 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -31,7 +31,11 @@ static uint32_t mJoinGetFinBlkCapacity(SMJoinOperatorInfo* pJoin, SSortMergeJoin uint32_t maxRows = TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize); if (INT64_MAX != pJoin->ctx.mergeCtx.limit && NULL == pJoin->pFinFilter) { uint32_t limitMaxRows = pJoin->ctx.mergeCtx.limit / MJOIN_BLK_THRESHOLD_RATIO + 1; - return (maxRows > limitMaxRows) ? limitMaxRows : maxRows; + maxRows = TMIN(maxRows, limitMaxRows); + } + + if (JOIN_STYPE_SEMI == pJoinNode->subType || JOIN_STYPE_ANTI == pJoinNode->subType) { + maxRows = TMIN(MJOIN_SEMI_ANTI_BLK_ROWS_NUM, maxRows); } return maxRows; @@ -257,7 +261,7 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { int32_t startGrpIdx = 0; int32_t startRowIdx = -1; - blockDataCleanup(pCtx->midBlk); + //blockDataCleanup(pCtx->midBlk); do { for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); @@ -350,7 +354,7 @@ static int32_t mOuterJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); int32_t startRowIdx = 0; - blockDataCleanup(pCtx->midBlk); + //blockDataCleanup(pCtx->midBlk); do { startRowIdx = build->grpRowIdx; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 0fe48ad282..302aa869f6 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -496,6 +496,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_SCALAR_FIELD(hashJoinHint); CLONE_NODE_FIELD(pLeftOnCond); CLONE_NODE_FIELD(pRightOnCond); + COPY_SCALAR_FIELD(timeRangeTarget); COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index f7521a7b13..a773a44c76 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2189,6 +2189,7 @@ static const char* jkJoinPhysiPlanLeftOnCond = "LeftOnCond"; static const char* jkJoinPhysiPlanRightOnCond = "RightOnCond"; static const char* jkJoinPhysiPlanTimeRangeSKey = "TimeRangeSKey"; static const char* jkJoinPhysiPlanTimeRangeEKey = "TimeRangeEKey"; +static const char* jkJoinPhysiPlanTimeRangeTarget = "TimeRangeTarget"; static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; @@ -2333,12 +2334,27 @@ static int32_t physiHashJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanSubType, pNode->subType); + } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkJoinPhysiPlanOnLeftCols, pNode->pOnLeft); } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkJoinPhysiPlanOnRightCols, pNode->pOnRight); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanLeftPrimExpr, nodeToJson, pNode->leftPrimExpr); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanRightPrimExpr, nodeToJson, pNode->rightPrimExpr); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightPrimSlotId, pNode->rightPrimSlotId); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pFullOnCond); } @@ -2363,6 +2379,9 @@ static int32_t physiHashJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinPhysiPlanRightOnCond, nodeToJson, pNode->pRightOnCond); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanTimeRangeTarget, pNode->timeRangeTarget); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanTimeRangeSKey, pNode->timeRange.skey); } @@ -2381,12 +2400,27 @@ static int32_t jsonToPhysiHashJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanSubType, pNode->subType, code); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkJoinPhysiPlanOnLeftCols, &pNode->pOnLeft); } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkJoinPhysiPlanOnRightCols, &pNode->pOnRight); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanLeftPrimExpr, &pNode->leftPrimExpr); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanRightPrimExpr, &pNode->rightPrimExpr); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightPrimSlotId, pNode->rightPrimSlotId, code); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pFullOnCond); } @@ -2411,6 +2445,9 @@ static int32_t jsonToPhysiHashJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinPhysiPlanRightOnCond, &pNode->pRightOnCond); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanTimeRangeTarget, pNode->timeRangeTarget, code); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkJoinPhysiPlanTimeRangeSKey, &pNode->timeRange.skey); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 5934aff63b..13afead762 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2641,8 +2641,14 @@ static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) { enum { PHY_HASH_JOIN_CODE_BASE_NODE = 1, PHY_HASH_JOIN_CODE_JOIN_TYPE, + PHY_HASH_JOIN_CODE_JOIN_STYPE, PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN, PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN, + PHY_HASH_JOIN_CODE_LEFT_PRIM_EXPR, + PHY_HASH_JOIN_CODE_RIGHT_PRIM_EXPR, + PHY_HASH_JOIN_CODE_LEFT_PRIM_SLOTID, + PHY_HASH_JOIN_CODE_RIGHT_PRIM_SLOTID, + PHY_HASH_JOIN_CODE_TIME_RANGE_TARGET, PHY_HASH_JOIN_CODE_ON_CONDITIONS, PHY_HASH_JOIN_CODE_TARGETS, PHY_HASH_JOIN_CODE_INPUT_ROW_NUM0, @@ -2663,12 +2669,30 @@ static int32_t physiHashJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, PHY_HASH_JOIN_CODE_JOIN_TYPE, pNode->joinType); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_HASH_JOIN_CODE_JOIN_STYPE, pNode->subType); + } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN, nodeListToMsg, pNode->pOnLeft); } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN, nodeListToMsg, pNode->pOnRight); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_LEFT_PRIM_EXPR, nodeToMsg, pNode->leftPrimExpr); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_RIGHT_PRIM_EXPR, nodeToMsg, pNode->rightPrimExpr); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_HASH_JOIN_CODE_LEFT_PRIM_SLOTID, pNode->leftPrimSlotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_HASH_JOIN_CODE_RIGHT_PRIM_SLOTID, pNode->rightPrimSlotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_HASH_JOIN_CODE_TIME_RANGE_TARGET, pNode->timeRangeTarget); + } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pFullOnCond); } @@ -2717,12 +2741,30 @@ static int32_t msgToPhysiHashJoinNode(STlvDecoder* pDecoder, void* pObj) { case PHY_HASH_JOIN_CODE_JOIN_TYPE: code = tlvDecodeEnum(pTlv, &pNode->joinType, sizeof(pNode->joinType)); break; + case PHY_HASH_JOIN_CODE_JOIN_STYPE: + code = tlvDecodeEnum(pTlv, &pNode->subType, sizeof(pNode->subType)); + break; case PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pOnLeft); break; case PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pOnRight); break; + case PHY_HASH_JOIN_CODE_LEFT_PRIM_EXPR: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->leftPrimExpr); + break; + case PHY_HASH_JOIN_CODE_RIGHT_PRIM_EXPR: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->rightPrimExpr); + break; + case PHY_HASH_JOIN_CODE_LEFT_PRIM_SLOTID: + code = tlvDecodeI32(pTlv, &pNode->leftPrimSlotId); + break; + case PHY_HASH_JOIN_CODE_RIGHT_PRIM_SLOTID: + code = tlvDecodeI32(pTlv, &pNode->rightPrimSlotId); + break; + case PHY_HASH_JOIN_CODE_TIME_RANGE_TARGET: + code = tlvDecodeI32(pTlv, &pNode->timeRangeTarget); + break; case PHY_HASH_JOIN_CODE_ON_CONDITIONS: code = msgToNodeFromTlv(pTlv, (void**)&pNode->pFullOnCond); break; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 896425c867..ed90ea2428 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1459,6 +1459,8 @@ void nodesDestroyNode(SNode* pNode) { destroyPhysiNode((SPhysiNode*)pPhyNode); nodesDestroyList(pPhyNode->pOnLeft); nodesDestroyList(pPhyNode->pOnRight); + nodesDestroyNode(pPhyNode->leftPrimExpr); + nodesDestroyNode(pPhyNode->rightPrimExpr); nodesDestroyNode(pPhyNode->pFullOnCond); nodesDestroyList(pPhyNode->pTargets); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index b4a4879378..bff3ab147d 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -54,18 +54,18 @@ static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol, bool isPartit } break; case FUNCTION_TYPE_WSTART: - if (!isPartitionBy) { - pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; - } + pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; pCol->colType = COLUMN_TYPE_WINDOW_START; - pCol->isPrimTs = true; + if (!isPartitionBy) { + pCol->isPrimTs = true; + } break; case FUNCTION_TYPE_WEND: - if (!isPartitionBy) { - pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; - } + pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; pCol->colType = COLUMN_TYPE_WINDOW_END; - pCol->isPrimTs = true; + if (!isPartitionBy) { + pCol->isPrimTs = true; + } break; case FUNCTION_TYPE_WDURATION: pCol->colType = COLUMN_TYPE_WINDOW_DURATION; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 56091fe5d9..cbaf0eb99d 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2160,6 +2160,11 @@ int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool groupSort, S } return nodesListMakeAppend(pSequencingNodes, (SNode*)pNode); } + case QUERY_NODE_LOGIC_PLAN_SORT: { + *keepSort = true; + NODES_CLEAR_LIST(*pSequencingNodes); + return TSDB_CODE_SUCCESS; + } case QUERY_NODE_LOGIC_PLAN_JOIN: { return sortPriKeyOptHandleJoinSort(pNode, groupSort, pSort, pNotOptimize, pSequencingNodes, keepSort); } @@ -2423,7 +2428,7 @@ static bool joinCondMayBeOptimized(SLogicNode* pNode) { SScanLogicNode* pLScan = (SScanLogicNode*)pLeft; SScanLogicNode* pRScan = (SScanLogicNode*)pRight; - if (!IS_TSWINDOW_SPECIFIED(pLScan->scanRange) && !IS_TSWINDOW_SPECIFIED(pLScan->scanRange)) { + if (!IS_TSWINDOW_SPECIFIED(pLScan->scanRange) && !IS_TSWINDOW_SPECIFIED(pRScan->scanRange)) { return false; } @@ -4744,7 +4749,7 @@ static bool hashJoinOptShouldBeOptimized(SLogicNode* pNode) { goto _return; } - if ((JOIN_STYPE_NONE != pJoin->subType && JOIN_STYPE_OUTER != pJoin->subType) || NULL != pJoin->pTagOnCond || NULL != pJoin->pColOnCond || pNode->pChildren->length != 2 ) { + if ((JOIN_STYPE_NONE != pJoin->subType && JOIN_STYPE_OUTER != pJoin->subType) || JOIN_TYPE_FULL == pJoin->joinType || pNode->pChildren->length != 2 ) { goto _return; } @@ -4766,8 +4771,9 @@ static int32_t hashJoinOptSplitPrimFromLogicCond(SNode **pCondition, SNode **pPr SNodeList *pPrimaryKeyConds = NULL; SNode *pCond = NULL; WHERE_EACH(pCond, pLogicCond->pParameterList) { - if (isCondColumnsFromMultiTable(pCond) || COND_TYPE_PRIMARY_KEY != classifyCondition(pCond)) { + if (filterIsMultiTableColsCond(pCond) || COND_TYPE_PRIMARY_KEY != filterClassifyCondition(pCond)) { WHERE_NEXT; + continue; } code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond)); @@ -4809,11 +4815,11 @@ int32_t hashJoinOptSplitPrimCond(SNode **pCondition, SNode **pPrimaryKeyCond) { } bool needOutput = false; - if (isCondColumnsFromMultiTable(*pCondition)) { + if (filterIsMultiTableColsCond(*pCondition)) { return TSDB_CODE_SUCCESS; } - EConditionType type = classifyCondition(*pCondition); + EConditionType type = filterClassifyCondition(*pCondition); if (COND_TYPE_PRIMARY_KEY == type) { *pPrimaryKeyCond = *pCondition; *pCondition = NULL; @@ -4830,6 +4836,7 @@ static int32_t hashJoinOptRewriteJoin(SOptimizeContext* pCxt, SLogicNode* pNode, pJoin->joinAlgo = JOIN_ALGO_HASH; if (NULL != pJoin->pColOnCond) { +#if 0 EJoinType t = pJoin->joinType; EJoinSubType s = pJoin->subType; @@ -4887,8 +4894,20 @@ static int32_t hashJoinOptRewriteJoin(SOptimizeContext* pCxt, SLogicNode* pNode, pJoin->timeRange.skey = TMAX(ltimeRange.skey, rtimeRange.skey); pJoin->timeRange.ekey = TMIN(ltimeRange.ekey, rtimeRange.ekey); } +#else + SNode* pPrimaryKeyCond = NULL; + hashJoinOptSplitPrimCond(&pJoin->pColOnCond, &pPrimaryKeyCond); + if (NULL != pPrimaryKeyCond) { + bool isStrict = false; + code = getTimeRangeFromNode(&pPrimaryKeyCond, &pJoin->timeRange, &isStrict); + nodesDestroyNode(pPrimaryKeyCond); + } +#endif + } else { + pJoin->timeRange = TSWINDOW_INITIALIZER; } +#if 0 if (NULL != pJoin->pTagOnCond && !TSWINDOW_IS_EQUAL(pJoin->timeRange, TSWINDOW_DESC_INITIALIZER)) { EJoinType t = pJoin->joinType; EJoinSubType s = pJoin->subType; @@ -4917,14 +4936,16 @@ static int32_t hashJoinOptRewriteJoin(SOptimizeContext* pCxt, SLogicNode* pNode, return code; } } +#endif if (!TSWINDOW_IS_EQUAL(pJoin->timeRange, TSWINDOW_DESC_INITIALIZER)) { - FOREACH(pNode, pJoin->node.pChildren) { - if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) { + SNode* pChild = NULL; + FOREACH(pChild, pJoin->node.pChildren) { + if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) { continue; } - SScanLogicNode* pScan = (SScanLogicNode*)pNode; + SScanLogicNode* pScan = (SScanLogicNode*)pChild; if (TSWINDOW_IS_EQUAL(pScan->scanRange, TSWINDOW_INITIALIZER)) { continue; } else if (pJoin->timeRange.ekey < pScan->scanRange.skey || pJoin->timeRange.skey > pScan->scanRange.ekey) { @@ -4936,7 +4957,59 @@ static int32_t hashJoinOptRewriteJoin(SOptimizeContext* pCxt, SLogicNode* pNode, } } } - + + pJoin->timeRangeTarget = 0; + + if (!TSWINDOW_IS_EQUAL(pJoin->timeRange, TSWINDOW_INITIALIZER)) { + SNode* pChild = NULL; + int32_t timeRangeTarget = 1; + FOREACH(pChild, pJoin->node.pChildren) { + if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) { + timeRangeTarget++; + continue; + } + + SScanLogicNode* pScan = (SScanLogicNode*)pChild; + if (TSWINDOW_IS_EQUAL(pScan->scanRange, pJoin->timeRange)) { + timeRangeTarget++; + continue; + } + + bool replaced = false; + switch (pJoin->joinType) { + case JOIN_TYPE_INNER: + pScan->scanRange.skey = pJoin->timeRange.skey; + pScan->scanRange.ekey = pJoin->timeRange.ekey; + replaced = true; + break; + case JOIN_TYPE_LEFT: + if (2 == timeRangeTarget) { + pScan->scanRange.skey = pJoin->timeRange.skey; + pScan->scanRange.ekey = pJoin->timeRange.ekey; + replaced = true; + } + break; + case JOIN_TYPE_RIGHT: + if (1 == timeRangeTarget) { + pScan->scanRange.skey = pJoin->timeRange.skey; + pScan->scanRange.ekey = pJoin->timeRange.ekey; + replaced = true; + } + break; + default: + break; + } + + if (replaced) { + timeRangeTarget++; + continue; + } + + pJoin->timeRangeTarget += timeRangeTarget; + timeRangeTarget++; + } + } + pCxt->optimized = true; OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_STB_JOIN); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index c584f7527e..6cfcc3336d 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -792,7 +792,7 @@ static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkI return TSDB_CODE_SUCCESS; } -static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, int16_t rightBlkId, SSortMergeJoinPhysiNode* pJoin) { +static int32_t setMergeJoinPrimColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, int16_t rightBlkId, SSortMergeJoinPhysiNode* pJoin) { if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) { SOperatorNode* pOp = (SOperatorNode*)pEqCond; if (pOp->opType != OP_TYPE_EQUAL && JOIN_STYPE_ASOF != subType) { @@ -922,7 +922,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond); if (TSDB_CODE_SUCCESS == code) { - code = setColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); + code = setMergeJoinPrimColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); } if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) { code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc); @@ -937,7 +937,13 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->addPrimEqCond, &pPrimKeyCond); if (TSDB_CODE_SUCCESS == code) { - code = setColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); + code = setMergeJoinPrimColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); + } + if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) { + code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc); + } + if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) { + code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc); } nodesDestroyNode(pPrimKeyCond); } @@ -1127,6 +1133,107 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys return TSDB_CODE_SUCCESS; } + +static int32_t setHashJoinPrimColEqCond(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, SHashJoinPhysiNode* pJoin) { + if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) { + SOperatorNode* pOp = (SOperatorNode*)pEqCond; + if (pOp->opType != OP_TYPE_EQUAL) { + planError("invalid primary cond opType, opType:%d", pOp->opType); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + switch (nodeType(pOp->pLeft)) { + case QUERY_NODE_COLUMN: { + SColumnNode* pCol = (SColumnNode*)pOp->pLeft; + if (leftBlkId == pCol->dataBlockId) { + pJoin->leftPrimSlotId = pCol->slotId; + } else if (rightBlkId == pCol->dataBlockId) { + pJoin->rightPrimSlotId = pCol->slotId; + } else { + planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + break; + } + case QUERY_NODE_FUNCTION: { + SFunctionNode* pFunc = (SFunctionNode*)pOp->pLeft; + if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) { + planError("invalid primary cond left function type, leftFuncType:%d", pFunc->funcType); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN != nodeType(pParam)) { + planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + SColumnNode* pCol = (SColumnNode*)pParam; + if (leftBlkId == pCol->dataBlockId) { + pJoin->leftPrimSlotId = pCol->slotId; + pJoin->leftPrimExpr = nodesCloneNode((SNode*)pFunc); + } else if (rightBlkId == pCol->dataBlockId) { + pJoin->rightPrimSlotId = pCol->slotId; + pJoin->rightPrimExpr = nodesCloneNode((SNode*)pFunc); + } else { + planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + break; + } + default: + planError("invalid primary cond left node type, leftNodeType:%d", nodeType(pOp->pLeft)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + switch (nodeType(pOp->pRight)) { + case QUERY_NODE_COLUMN: { + SColumnNode* pCol = (SColumnNode*)pOp->pRight; + if (leftBlkId == pCol->dataBlockId) { + pJoin->leftPrimSlotId = pCol->slotId; + } else if (rightBlkId == pCol->dataBlockId) { + pJoin->rightPrimSlotId = pCol->slotId; + } else { + planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + break; + } + case QUERY_NODE_FUNCTION: { + SFunctionNode* pFunc = (SFunctionNode*)pOp->pRight; + if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) { + planError("invalid primary cond right function type, rightFuncType:%d", pFunc->funcType); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN != nodeType(pParam)) { + planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + SColumnNode* pCol = (SColumnNode*)pParam; + if (leftBlkId == pCol->dataBlockId) { + pJoin->leftPrimSlotId = pCol->slotId; + pJoin->leftPrimExpr = nodesCloneNode((SNode*)pFunc); + } else if (rightBlkId == pCol->dataBlockId) { + pJoin->rightPrimSlotId = pCol->slotId; + pJoin->rightPrimExpr = nodesCloneNode((SNode*)pFunc); + } else { + planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + break; + } + default: + planError("invalid primary cond right node type, rightNodeType:%d", nodeType(pOp->pRight)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + } else { + planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + + static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) { SHashJoinPhysiNode* pJoin = @@ -1144,10 +1251,23 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil pJoin->pWindowOffset = nodesCloneNode(pJoinLogicNode->pWindowOffset); pJoin->pJLimit = nodesCloneNode(pJoinLogicNode->pJLimit); pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; + pJoin->timeRangeTarget = pJoinLogicNode->timeRangeTarget; pJoin->timeRange.skey = pJoinLogicNode->timeRange.skey; pJoin->timeRange.ekey = pJoinLogicNode->timeRange.ekey; - code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond); + if (NULL != pJoinLogicNode->pPrimKeyEqCond) { + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, + &pJoin->pPrimKeyCond); + if (TSDB_CODE_SUCCESS == code) { + code = setHashJoinPrimColEqCond(pJoin->pPrimKeyCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); + } + if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) { + code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc); + } + if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) { + code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc); + } + } if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond); } diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index a7c21092c6..3b9b348ff5 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -15,6 +15,8 @@ #include "functionMgt.h" #include "planInt.h" +#include "scalar.h" +#include "filter.h" static char* getUsageErrFormat(int32_t errCode) { switch (errCode) { diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 55030d0a96..57f2543691 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -4802,14 +4802,8 @@ static EDealRes classifyConditionImpl(SNode *pNode, void *pContext) { return DEAL_RES_CONTINUE; } -typedef enum EConditionType { - COND_TYPE_PRIMARY_KEY = 1, - COND_TYPE_TAG_INDEX, - COND_TYPE_TAG, - COND_TYPE_NORMAL -} EConditionType; -static EConditionType classifyCondition(SNode *pNode) { +EConditionType filterClassifyCondition(SNode *pNode) { SClassifyConditionCxt cxt = {.hasPrimaryKey = false, .hasTagIndexCol = false, .hasOtherCol = false}; nodesWalkExpr(pNode, classifyConditionImpl, &cxt); return cxt.hasOtherCol ? COND_TYPE_NORMAL @@ -4819,7 +4813,7 @@ static EConditionType classifyCondition(SNode *pNode) { : (cxt.hasTagIndexCol ? COND_TYPE_TAG_INDEX : COND_TYPE_TAG))); } -static bool isCondColumnsFromMultiTable(SNode *pCond) { +bool filterIsMultiTableColsCond(SNode *pCond) { SNodeList *pCondCols = nodesMakeList(); int32_t code = nodesCollectColumnsFromNode(pCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols); if (code == TSDB_CODE_SUCCESS) { @@ -4851,12 +4845,12 @@ static int32_t partitionLogicCond(SNode **pCondition, SNode **pPrimaryKeyCond, S SNodeList *pOtherConds = NULL; SNode *pCond = NULL; FOREACH(pCond, pLogicCond->pParameterList) { - if (isCondColumnsFromMultiTable(pCond)) { + if (filterIsMultiTableColsCond(pCond)) { if (NULL != pOtherCond) { code = nodesListMakeAppend(&pOtherConds, nodesCloneNode(pCond)); } } else { - switch (classifyCondition(pCond)) { + switch (filterClassifyCondition(pCond)) { case COND_TYPE_PRIMARY_KEY: if (NULL != pPrimaryKeyCond) { code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond)); @@ -4942,13 +4936,13 @@ int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode * } bool needOutput = false; - if (isCondColumnsFromMultiTable(*pCondition)) { + if (filterIsMultiTableColsCond(*pCondition)) { if (NULL != pOtherCond) { *pOtherCond = *pCondition; needOutput = true; } } else { - switch (classifyCondition(*pCondition)) { + switch (filterClassifyCondition(*pCondition)) { case COND_TYPE_PRIMARY_KEY: if (NULL != pPrimaryKeyCond) { *pPrimaryKeyCond = *pCondition;