fix: join merge block issue

This commit is contained in:
dapan1121 2024-04-03 09:54:49 +08:00
parent 5934a137dd
commit 4217223a6a
5 changed files with 84 additions and 52 deletions

View File

@ -30,6 +30,7 @@ extern "C" {
#define MJOIN_BLK_SIZE_LIMIT 10485760 #define MJOIN_BLK_SIZE_LIMIT 10485760
#define MJOIN_ROW_BITMAP_SIZE (2 * 1048576) #define MJOIN_ROW_BITMAP_SIZE (2 * 1048576)
#endif #endif
#define MJOIN_BLK_THRESHOLD_RATIO 0.9
struct SMJoinOperatorInfo; struct SMJoinOperatorInfo;
@ -180,6 +181,7 @@ typedef struct SMJoinGrpRows {
bool seqWinGrp; \ bool seqWinGrp; \
bool groupJoin; \ bool groupJoin; \
int32_t blkThreshold; \ int32_t blkThreshold; \
int64_t limit; \
int64_t jLimit int64_t jLimit
typedef struct SMJoinCommonCtx { typedef struct SMJoinCommonCtx {
@ -199,6 +201,7 @@ typedef struct SMJoinMergeCtx {
bool seqWinGrp; bool seqWinGrp;
bool groupJoin; bool groupJoin;
int32_t blkThreshold; int32_t blkThreshold;
int64_t limit;
int64_t jLimit; int64_t jLimit;
// KEEP IT FIRST // KEEP IT FIRST
@ -244,6 +247,7 @@ typedef struct SMJoinWindowCtx {
bool seqWinGrp; bool seqWinGrp;
bool groupJoin; bool groupJoin;
int32_t blkThreshold; int32_t blkThreshold;
int64_t limit;
int64_t jLimit; int64_t jLimit;
// KEEP IT FIRST // KEEP IT FIRST

View File

@ -27,6 +27,23 @@
#include "ttypes.h" #include "ttypes.h"
#include "mergejoin.h" #include "mergejoin.h"
static uint32_t mJoinGetFinBlkCapacity(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
uint32_t maxRows = TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize);
if (INT64_MAX != pJoin->ctx.mergeCtx.limit && NULL == pJoin->pFinFilter) {
uint32_t limitMaxRows = pJoin->ctx.mergeCtx.limit / MJOIN_BLK_THRESHOLD_RATIO + 1;
return (maxRows > limitMaxRows) ? limitMaxRows : maxRows;
}
return maxRows;
}
static FORCE_INLINE bool mJoinBlkReachThreshold(SMJoinOperatorInfo* pInfo, int64_t blkRows) {
if (INT64_MAX == pInfo->ctx.mergeCtx.limit || pInfo->pFinFilter != NULL) {
return blkRows >= pInfo->ctx.mergeCtx.blkThreshold;
}
return (pInfo->execInfo.resRows + blkRows) >= pInfo->ctx.mergeCtx.limit;
}
int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
@ -519,7 +536,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->midRemains) { if (pCtx->midRemains) {
MJ_ERR_JRET(mJoinHandleMidRemains(pCtx)); MJ_ERR_JRET(mJoinHandleMidRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
pCtx->midRemains = false; pCtx->midRemains = false;
@ -527,7 +544,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->grpRemains) { if (pCtx->grpRemains) {
MJ_ERR_JRET(mLeftJoinHandleGrpRemains(pCtx)); MJ_ERR_JRET(mLeftJoinHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
pCtx->grpRemains = false; pCtx->grpRemains = false;
@ -547,7 +564,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == pCtx->lastEqTs) { if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -562,7 +579,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == buildTs) { if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs; pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -570,7 +587,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
} else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) { } else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} else { } else {
@ -594,7 +611,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false)); MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -780,7 +797,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->grpRemains) { if (pCtx->grpRemains) {
MJ_ERR_JRET(mInnerJoinHandleGrpRemains(pCtx)); MJ_ERR_JRET(mInnerJoinHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
pCtx->grpRemains = false; pCtx->grpRemains = false;
@ -796,7 +813,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == pCtx->lastEqTs) { if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -814,7 +831,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == buildTs) { if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs; pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -1112,7 +1129,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->midRemains) { if (pCtx->midRemains) {
MJ_ERR_JRET(mJoinHandleMidRemains(pCtx)); MJ_ERR_JRET(mJoinHandleMidRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
pCtx->midRemains = false; pCtx->midRemains = false;
@ -1120,7 +1137,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->grpRemains) { if (pCtx->grpRemains) {
MJ_ERR_JRET(mFullJoinHandleGrpRemains(pCtx)); MJ_ERR_JRET(mFullJoinHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
pCtx->grpRemains = false; pCtx->grpRemains = false;
@ -1128,7 +1145,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->nmatchRemains) { if (pCtx->nmatchRemains) {
MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -1137,7 +1154,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
if (!mFullJoinRetrieve(pOperator, pJoin)) { if (!mFullJoinRetrieve(pOperator, pJoin)) {
if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -1151,7 +1168,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == pCtx->lastEqTs) { if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -1164,7 +1181,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -1173,7 +1190,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == buildTs) { if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs; pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -1182,7 +1199,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
if (!FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && probeTs != pCtx->lastEqTs && pJoin->build->rowBitmapSize > 0) { if (!FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && probeTs != pCtx->lastEqTs && pJoin->build->rowBitmapSize > 0) {
MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -1196,7 +1213,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
MJ_ERR_JRET(mJoinProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs)); MJ_ERR_JRET(mJoinProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs));
} }
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -1204,7 +1221,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
if (pJoin->build->dsFetchDone && !FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { if (pJoin->build->dsFetchDone && !FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -1217,7 +1234,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false)); MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -1225,7 +1242,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
if (pJoin->probe->dsFetchDone && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { if (pJoin->probe->dsFetchDone && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -1238,7 +1255,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false)); MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -1479,7 +1496,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->grpRemains) { if (pCtx->grpRemains) {
MJ_ERR_JRET(mSemiJoinHandleGrpRemains(pCtx)); MJ_ERR_JRET(mSemiJoinHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
pCtx->grpRemains = false; pCtx->grpRemains = false;
@ -1495,7 +1512,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == pCtx->lastEqTs) { if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -1513,7 +1530,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == buildTs) { if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs; pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -1750,7 +1767,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->grpRemains) { if (pCtx->grpRemains) {
MJ_ERR_JRET(mAntiJoinHandleGrpRemains(pCtx)); MJ_ERR_JRET(mAntiJoinHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
pCtx->grpRemains = false; pCtx->grpRemains = false;
@ -1766,7 +1783,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == pCtx->lastEqTs) { if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -1781,7 +1798,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == buildTs) { if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs; pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -1789,7 +1806,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
} else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) { } else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} else { } else {
@ -1813,7 +1830,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false)); MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -2129,7 +2146,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->grpRemains) { if (pCtx->grpRemains) {
MJ_ERR_JRET(mAsofBackwardHandleGrpRemains(pCtx)); MJ_ERR_JRET(mAsofBackwardHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
pCtx->grpRemains = false; pCtx->grpRemains = false;
@ -2149,7 +2166,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == pCtx->lastTs) { if (probeTs == pCtx->lastTs) {
MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, true)); MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -2170,7 +2187,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == buildTs) { if (probeTs == buildTs) {
pCtx->lastTs = probeTs; pCtx->lastTs = probeTs;
MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, false)); MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -2185,7 +2202,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) {
MJ_ERR_JRET(mAsofBackwardHandleUnclosedGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs)); MJ_ERR_JRET(mAsofBackwardHandleUnclosedGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs));
} }
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -2200,7 +2217,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) {
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -2524,7 +2541,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->grpRemains) { if (pCtx->grpRemains) {
MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx)); MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
pCtx->grpRemains = false; pCtx->grpRemains = false;
@ -2544,7 +2561,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == pCtx->lastTs) { if (probeTs == pCtx->lastTs) {
MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, true)); MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -2559,7 +2576,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) {
if (probeTs == buildTs) { if (probeTs == buildTs) {
pCtx->lastTs = probeTs; pCtx->lastTs = probeTs;
MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, false)); MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -2574,7 +2591,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) {
MJ_ERR_JRET(mAsofForwardSkipBuildGrp(pCtx, pJoin, &pBuildCol, &probeTs, &buildTs)); MJ_ERR_JRET(mAsofForwardSkipBuildGrp(pCtx, pJoin, &pBuildCol, &probeTs, &buildTs));
} }
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -2588,7 +2605,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) {
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -3206,7 +3223,7 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->grpRemains) { if (pCtx->grpRemains) {
MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx)); MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) { if ((mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
pCtx->grpRemains = false; pCtx->grpRemains = false;
@ -3237,7 +3254,7 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
MJ_ERR_JRET(mWinJoinTrimDumpGrpCache(pCtx)); MJ_ERR_JRET(mWinJoinTrimDumpGrpCache(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) { if ((mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -3348,9 +3365,9 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
} }
pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize)); blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode));
pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.9; pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO;
MJ_ERR_RET(mJoinInitWindowCache(&pCtx->cache, pJoin, pCtx)); MJ_ERR_RET(mJoinInitWindowCache(&pCtx->cache, pJoin, pCtx));
@ -3387,14 +3404,15 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0); ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0);
blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize));
blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode));
if (pJoin->pFPreFilter) { if (pJoin->pFPreFilter) {
pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false); pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false);
blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity); blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity);
} }
pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.9; pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO;
switch (pJoin->joinType) { switch (pJoin->joinType) {
case JOIN_TYPE_INNER: case JOIN_TYPE_INNER:

View File

@ -28,7 +28,6 @@
#include "functionMgt.h" #include "functionMgt.h"
#include "mergejoin.h" #include "mergejoin.h"
int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) { int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
@ -426,8 +425,10 @@ _err:
int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) { int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
SSDataBlock* pLess = NULL; SSDataBlock* pLess = *ppMid;
SSDataBlock* pMore = NULL; SSDataBlock* pMore = *ppFin;
/*
if ((*ppMid)->info.rows < (*ppFin)->info.rows) { if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
pLess = (*ppMid); pLess = (*ppMid);
pMore = (*ppFin); pMore = (*ppFin);
@ -435,6 +436,7 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl
pLess = (*ppFin); pLess = (*ppFin);
pMore = (*ppMid); pMore = (*ppMid);
} }
*/
int32_t totalRows = pMore->info.rows + pLess->info.rows; int32_t totalRows = pMore->info.rows + pLess->info.rows;
if (totalRows <= pMore->info.capacity) { if (totalRows <= pMore->info.capacity) {
@ -448,9 +450,11 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl
pCtx->midRemains = true; pCtx->midRemains = true;
} }
/*
if (pMore != (*ppFin)) { if (pMore != (*ppFin)) {
TSWAP(*ppMid, *ppFin); TSWAP(*ppMid, *ppFin);
} }
*/
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1046,6 +1050,7 @@ static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SM
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
pJoin->ctx.mergeCtx.groupJoin = pJoinNode->grpJoin; pJoin->ctx.mergeCtx.groupJoin = pJoinNode->grpJoin;
pJoin->ctx.mergeCtx.limit = pJoinNode->node.pLimit ? ((SLimitNode*)pJoinNode->node.pLimit)->limit : INT64_MAX;
pJoin->retrieveFp = pJoinNode->grpJoin ? mJoinGrpRetrieveImpl : mJoinRetrieveImpl; pJoin->retrieveFp = pJoinNode->grpJoin ? mJoinGrpRetrieveImpl : mJoinRetrieveImpl;
pJoin->outBlkId = pJoinNode->node.pOutputDataBlockDesc->dataBlockId; pJoin->outBlkId = pJoinNode->node.pOutputDataBlockDesc->dataBlockId;
@ -1568,7 +1573,8 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
if (pOperator->cost.openCost == 0) { if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
pJoin->execInfo.resRows += pBlock ? pBlock->info.rows : 0;
return (pBlock && pBlock->info.rows > 0) ? pBlock : NULL; return (pBlock && pBlock->info.rows > 0) ? pBlock : NULL;
} }

View File

@ -352,7 +352,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// continue merge data, ignore the group id // continue merge data, ignore the group id
blockDataMerge(pFinalRes, pRes); blockDataMerge(pFinalRes, pRes);
if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold) { if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) {
continue; continue;
} }
} }

View File

@ -2761,7 +2761,7 @@ static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) {
} }
if (NULL != pNode->pParent && (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || if (NULL != pNode->pParent && (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)))) { QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)) || QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode->pParent))) {
return false; return false;
} }
@ -4210,6 +4210,10 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu
} }
return true; return true;
} }
case QUERY_NODE_LOGIC_PLAN_JOIN: {
cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT);
break;
}
default: default:
break; break;
} }