From 4217223a6ac14ee7b866e5615c8da7dd8de9a894 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 3 Apr 2024 09:54:49 +0800 Subject: [PATCH] fix: join merge block issue --- source/libs/executor/inc/mergejoin.h | 4 + source/libs/executor/src/mergejoin.c | 110 +++++++++++-------- source/libs/executor/src/mergejoinoperator.c | 14 ++- source/libs/executor/src/projectoperator.c | 2 +- source/libs/planner/src/planOptimizer.c | 6 +- 5 files changed, 84 insertions(+), 52 deletions(-) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 8d6b25320c..5f851af0dc 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -30,6 +30,7 @@ extern "C" { #define MJOIN_BLK_SIZE_LIMIT 10485760 #define MJOIN_ROW_BITMAP_SIZE (2 * 1048576) #endif +#define MJOIN_BLK_THRESHOLD_RATIO 0.9 struct SMJoinOperatorInfo; @@ -180,6 +181,7 @@ typedef struct SMJoinGrpRows { bool seqWinGrp; \ bool groupJoin; \ int32_t blkThreshold; \ + int64_t limit; \ int64_t jLimit typedef struct SMJoinCommonCtx { @@ -199,6 +201,7 @@ typedef struct SMJoinMergeCtx { bool seqWinGrp; bool groupJoin; int32_t blkThreshold; + int64_t limit; int64_t jLimit; // KEEP IT FIRST @@ -244,6 +247,7 @@ typedef struct SMJoinWindowCtx { bool seqWinGrp; bool groupJoin; int32_t blkThreshold; + int64_t limit; int64_t jLimit; // KEEP IT FIRST diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index ee8d3b5323..a17fc457e1 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -27,6 +27,23 @@ #include "ttypes.h" #include "mergejoin.h" +static uint32_t mJoinGetFinBlkCapacity(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { + uint32_t maxRows = TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize); + if (INT64_MAX != pJoin->ctx.mergeCtx.limit && NULL == pJoin->pFinFilter) { + uint32_t limitMaxRows = pJoin->ctx.mergeCtx.limit / MJOIN_BLK_THRESHOLD_RATIO + 1; + return (maxRows > limitMaxRows) ? limitMaxRows : maxRows; + } + + return maxRows; +} + +static FORCE_INLINE bool mJoinBlkReachThreshold(SMJoinOperatorInfo* pInfo, int64_t blkRows) { + if (INT64_MAX == pInfo->ctx.mergeCtx.limit || pInfo->pFinFilter != NULL) { + return blkRows >= pInfo->ctx.mergeCtx.blkThreshold; + } + + return (pInfo->execInfo.resRows + blkRows) >= pInfo->ctx.mergeCtx.limit; +} int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { @@ -519,7 +536,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->midRemains) { MJ_ERR_JRET(mJoinHandleMidRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } pCtx->midRemains = false; @@ -527,7 +544,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->grpRemains) { MJ_ERR_JRET(mLeftJoinHandleGrpRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } pCtx->grpRemains = false; @@ -547,7 +564,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == pCtx->lastEqTs) { MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -562,7 +579,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == buildTs) { pCtx->lastEqTs = probeTs; MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -570,7 +587,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); } else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) { MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } else { @@ -594,7 +611,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -780,7 +797,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->grpRemains) { MJ_ERR_JRET(mInnerJoinHandleGrpRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } pCtx->grpRemains = false; @@ -796,7 +813,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == pCtx->lastEqTs) { MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -814,7 +831,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == buildTs) { pCtx->lastEqTs = probeTs; MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -1112,7 +1129,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->midRemains) { MJ_ERR_JRET(mJoinHandleMidRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } pCtx->midRemains = false; @@ -1120,7 +1137,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->grpRemains) { MJ_ERR_JRET(mFullJoinHandleGrpRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } pCtx->grpRemains = false; @@ -1128,7 +1145,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->nmatchRemains) { MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -1137,7 +1154,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { if (!mFullJoinRetrieve(pOperator, pJoin)) { if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -1151,7 +1168,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == pCtx->lastEqTs) { MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -1164,7 +1181,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -1173,7 +1190,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == buildTs) { pCtx->lastEqTs = probeTs; MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -1182,7 +1199,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { if (!FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && probeTs != pCtx->lastEqTs && pJoin->build->rowBitmapSize > 0) { MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -1196,7 +1213,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { MJ_ERR_JRET(mJoinProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs)); } - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -1204,7 +1221,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { if (pJoin->build->dsFetchDone && !FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -1217,7 +1234,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -1225,7 +1242,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { if (pJoin->probe->dsFetchDone && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -1238,7 +1255,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -1479,7 +1496,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->grpRemains) { MJ_ERR_JRET(mSemiJoinHandleGrpRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } pCtx->grpRemains = false; @@ -1495,7 +1512,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == pCtx->lastEqTs) { MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -1513,7 +1530,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == buildTs) { pCtx->lastEqTs = probeTs; MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -1750,7 +1767,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->grpRemains) { MJ_ERR_JRET(mAntiJoinHandleGrpRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } pCtx->grpRemains = false; @@ -1766,7 +1783,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == pCtx->lastEqTs) { MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -1781,7 +1798,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == buildTs) { pCtx->lastEqTs = probeTs; MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -1789,7 +1806,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); } else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) { MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } else { @@ -1813,7 +1830,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -2129,7 +2146,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->grpRemains) { MJ_ERR_JRET(mAsofBackwardHandleGrpRemains(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } pCtx->grpRemains = false; @@ -2149,7 +2166,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == pCtx->lastTs) { MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, true)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -2170,7 +2187,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == buildTs) { pCtx->lastTs = probeTs; MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, false)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -2185,7 +2202,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) { MJ_ERR_JRET(mAsofBackwardHandleUnclosedGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs)); } - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -2200,7 +2217,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -2524,7 +2541,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->grpRemains) { MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } pCtx->grpRemains = false; @@ -2544,7 +2561,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == pCtx->lastTs) { MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, true)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -2559,7 +2576,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) { if (probeTs == buildTs) { pCtx->lastTs = probeTs; MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, false)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } @@ -2574,7 +2591,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) { MJ_ERR_JRET(mAsofForwardSkipBuildGrp(pCtx, pJoin, &pBuildCol, &probeTs, &buildTs)); } - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -2588,7 +2605,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) { return pCtx->finBlk; } } @@ -3206,7 +3223,7 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->grpRemains) { MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) { + if ((mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) { return pCtx->finBlk; } pCtx->grpRemains = false; @@ -3237,7 +3254,7 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { MJ_ERR_JRET(mWinJoinTrimDumpGrpCache(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) { + if ((mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) { return pCtx->finBlk; } } @@ -3348,9 +3365,9 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p } pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize)); + blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)); - pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.9; + pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO; MJ_ERR_RET(mJoinInitWindowCache(&pCtx->cache, pJoin, pCtx)); @@ -3387,14 +3404,15 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0); - blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize)); + + blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)); if (pJoin->pFPreFilter) { pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false); blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity); } - pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.9; + pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO; switch (pJoin->joinType) { case JOIN_TYPE_INNER: diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index e13b0f63d5..faaebc1cd8 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -28,7 +28,6 @@ #include "functionMgt.h" #include "mergejoin.h" - int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) { SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); @@ -426,8 +425,10 @@ _err: int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) { - SSDataBlock* pLess = NULL; - SSDataBlock* pMore = NULL; + SSDataBlock* pLess = *ppMid; + SSDataBlock* pMore = *ppFin; + +/* if ((*ppMid)->info.rows < (*ppFin)->info.rows) { pLess = (*ppMid); pMore = (*ppFin); @@ -435,6 +436,7 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl pLess = (*ppFin); pMore = (*ppMid); } +*/ int32_t totalRows = pMore->info.rows + pLess->info.rows; if (totalRows <= pMore->info.capacity) { @@ -448,9 +450,11 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl pCtx->midRemains = true; } +/* if (pMore != (*ppFin)) { TSWAP(*ppMid, *ppFin); } +*/ return TSDB_CODE_SUCCESS; } @@ -1046,6 +1050,7 @@ static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SM static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { pJoin->ctx.mergeCtx.groupJoin = pJoinNode->grpJoin; + pJoin->ctx.mergeCtx.limit = pJoinNode->node.pLimit ? ((SLimitNode*)pJoinNode->node.pLimit)->limit : INT64_MAX; pJoin->retrieveFp = pJoinNode->grpJoin ? mJoinGrpRetrieveImpl : mJoinRetrieveImpl; pJoin->outBlkId = pJoinNode->node.pOutputDataBlockDesc->dataBlockId; @@ -1568,7 +1573,8 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { if (pOperator->cost.openCost == 0) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - + + pJoin->execInfo.resRows += pBlock ? pBlock->info.rows : 0; return (pBlock && pBlock->info.rows > 0) ? pBlock : NULL; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index c6ebb04446..5e6aa39543 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -352,7 +352,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // continue merge data, ignore the group id blockDataMerge(pFinalRes, pRes); - if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold) { + if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) { continue; } } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 48310a37c1..1d2d7f280f 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2761,7 +2761,7 @@ static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) { } if (NULL != pNode->pParent && (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || - QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)))) { + QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)) || QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode->pParent))) { return false; } @@ -4210,6 +4210,10 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu } return true; } + case QUERY_NODE_LOGIC_PLAN_JOIN: { + cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT); + break; + } default: break; }