From 5f0440a42c2a1e332f7ab99992227467eaf0c76b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 10 Jan 2024 18:56:49 +0800 Subject: [PATCH] enh: support semi/anti join --- source/libs/executor/inc/mergejoin.h | 19 +- source/libs/executor/src/mergejoin.c | 629 +++++++++++++- source/libs/executor/src/mergejoinoperator.c | 304 ++++++- source/libs/executor/test/joinTests.cpp | 849 +++++++++++++++++-- 4 files changed, 1699 insertions(+), 102 deletions(-) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index a25c26a389..0959c15f00 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -19,9 +19,9 @@ extern "C" { #endif -#if 0 +#if 1 #define MJOIN_DEFAULT_BLK_ROWS_NUM 2 //4096 -#define MJOIN_HJOIN_CART_THRESHOLD 16 +#define MJOIN_HJOIN_CART_THRESHOLD 2 #define MJOIN_BLK_SIZE_LIMIT 0 //10485760 #define MJOIN_ROW_BITMAP_SIZE (2 * 1048576) #else @@ -114,6 +114,8 @@ typedef struct SMJoinTableCtx { int64_t grpTotalRows; int32_t grpIdx; + bool noKeepEqGrpRows; + bool multiEqGrpRows; SArray* eqGrps; SArray* createdBlks; @@ -122,6 +124,7 @@ typedef struct SMJoinTableCtx { int32_t grpArrayIdx; SArray* pGrpArrays; + bool multiRowsGrp; int32_t grpRowIdx; SArray* pHashCurGrp; SMJoinHashGrpRows* pHashGrpRows; @@ -168,9 +171,9 @@ typedef struct SMJoinMergeCtx { joinCartFp mergeCartFp; } SMJoinMergeCtx; -typedef struct SMJoinWinCtx { +typedef struct SMJoinWindowCtx { -} SMJoinWinCtx; +} SMJoinWindowCtx; typedef struct SMJoinFlowFlags { @@ -183,8 +186,8 @@ typedef struct SMJoinFlowFlags { typedef struct SMJoinCtx { SMJoinFlowFlags* pFlags; union { - SMJoinMergeCtx mergeCtx; - SMJoinWinCtx winCtx; + SMJoinMergeCtx mergeCtx; + SMJoinWindowCtx windowCtx; }; } SMJoinCtx; @@ -279,6 +282,8 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator); +SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator); +SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator); bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb); void mJoinSetDone(SOperatorInfo* pOperator); bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen); @@ -299,6 +304,8 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset); int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs); int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs); +int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo); +int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 5677e83992..1f6a23aa9c 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -642,7 +642,7 @@ static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJo buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); } - if (!probeGot || !buildGot) { + if (!probeGot) { mJoinSetDone(pOperator); return false; } @@ -684,11 +684,14 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { return pCtx->finBlk; } - if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { + if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { continue; - } else { - MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); - } + } + + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); + } else if (MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { + mJoinSetDone(pOperator); + break; } do { @@ -706,7 +709,9 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); continue; - } else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { + } + + if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); continue; @@ -759,8 +764,6 @@ static int32_t mFullJoinMergeCart(SMJoinMergeCtx* pCtx) { return (NULL == pCtx->pJoin->pFPreFilter) ? mOuterJoinMergeFullCart(pCtx) : mOuterJoinMergeSeqCart(pCtx); } -const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2}; - static FORCE_INLINE int32_t mFullJoinOutputHashRow(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, int32_t idx) { SMJoinGrpRows grp = {0}; SMJoinRowPos* pPos = taosArrayGet(pGrpRows->pRows, idx); @@ -785,6 +788,7 @@ static int32_t mFullJoinOutputHashGrpRows(SMJoinMergeCtx* pCtx, SMJoinHashGrpRow } static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) { + static const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2}; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinNMatchCtx* pNMatch = &build->nMatchCtx; if (NULL == pNMatch->pGrp) { @@ -891,6 +895,7 @@ static int32_t mFullJoinOutputMergeGrpRows(SMJoinMergeCtx* pCtx, SMJoinGrpRows* static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) { + static const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2}; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinNMatchCtx* pNMatch = &build->nMatchCtx; bool grpDone = false; @@ -906,7 +911,7 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) { } if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) { - if (pGrpRows->rowMatchNum <= 0) { + if (!pGrpRows->allRowsNMatch) { pGrpRows->allRowsNMatch = true; pNMatch->rowIdx = pGrpRows->beginIdx; } @@ -952,10 +957,14 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) { break; } } + + if (BLK_IS_FULL(pCtx->finBlk)) { + break; + } } if (BLK_IS_FULL(pCtx->finBlk)) { - if (pNMatch->bitIdx == bitBytes) { + if (pNMatch->bitIdx >= bitBytes) { ++pNMatch->grpIdx; pNMatch->bitIdx = 0; } @@ -1132,6 +1141,583 @@ _return: } +static int32_t mSemiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + + do { + blockDataCleanup(pCtx->midBlk); + + mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); + + if (pCtx->midBlk->info.rows > 0) { + MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pPreFilter)); + } + + if (pCtx->midBlk->info.rows <= 0) { + if (build->grpRowIdx < 0) { + break; + } + + continue; + } + + ASSERT(1 == pCtx->midBlk->info.rows); + MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk)); + ASSERT(false == pCtx->midRemains); + + break; + } while (true); + + return TSDB_CODE_SUCCESS; +} + + +static int32_t mSemiJoinHashSeqCart(SMJoinMergeCtx* pCtx) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); + + size_t bufLen = 0; + int32_t probeEndIdx = probeGrp->endIdx; + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); probeGrp->readIdx++) { + if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { + continue; + } + + void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); + if (NULL == pGrp) { + continue; + } + + build->pHashCurGrp = *(SArray**)pGrp; + build->grpRowIdx = 0; + + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mSemiJoinHashGrpCartFilter(pCtx, probeGrp)); + probeGrp->endIdx = probeEndIdx; + } + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + size_t bufLen = 0; + + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) { + if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { + continue; + } + + void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); + if (NULL == pGrp) { + continue; + } + + build->pHashCurGrp = *(SArray**)pGrp; + ASSERT(1 == taosArrayGetSize(build->pHashCurGrp)); + build->grpRowIdx = 0; + mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + ASSERT(build->grpRowIdx < 0); + } + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + SMJoinGrpRows* buildGrp = NULL; + int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); + int32_t probeEndIdx = probeGrp->endIdx; + int32_t rowsLeft = pCtx->midBlk->info.capacity; + + do { + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); + ++probeGrp->readIdx, probeGrp->endIdx = probeEndIdx, build->grpIdx = 0) { + probeGrp->endIdx = probeGrp->readIdx; + + rowsLeft = pCtx->midBlk->info.capacity; + + blockDataCleanup(pCtx->midBlk); + for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) { + buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + + if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); + rowsLeft -= GRP_REMAIN_ROWS(buildGrp); + buildGrp->readIdx = buildGrp->beginIdx; + continue; + } + + int32_t buildEndIdx = buildGrp->endIdx; + buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; + ASSERT(buildGrp->endIdx >= buildGrp->readIdx); + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); + buildGrp->readIdx += rowsLeft; + buildGrp->endIdx = buildEndIdx; + rowsLeft = 0; + break; + } + + if (pCtx->midBlk->info.rows > 0) { + MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pFPreFilter)); + } + + if (0 == pCtx->midBlk->info.rows) { + if (build->grpIdx == buildGrpNum) { + continue; + } + } else { + ASSERT(1 == pCtx->midBlk->info.rows); + MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk)); + ASSERT(false == pCtx->midRemains); + + if (build->grpIdx == buildGrpNum) { + continue; + } + + buildGrp->readIdx = buildGrp->beginIdx; + continue; + } + + //need break + + probeGrp->endIdx = probeEndIdx; + break; + } + + if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) { + break; + } + } while (true); + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t mSemiJoinMergeFullCart(SMJoinMergeCtx* pCtx) { + int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); + SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, 0); + int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); + int32_t probeEndIdx = probeGrp->endIdx; + + ASSERT(1 == taosArrayGetSize(build->eqGrps)); + ASSERT(buildGrp->beginIdx == buildGrp->endIdx); + + if (probeRows <= rowsLeft) { + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + + pCtx->grpRemains = false; + return TSDB_CODE_SUCCESS; + } + + probeGrp->endIdx = probeGrp->readIdx + rowsLeft - 1; + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + probeGrp->readIdx = probeGrp->endIdx + 1; + probeGrp->endIdx = probeEndIdx; + + pCtx->grpRemains = true; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t mSemiJoinHashCart(SMJoinMergeCtx* pCtx) { + return (NULL == pCtx->pJoin->pPreFilter) ? mSemiJoinHashFullCart(pCtx) : mSemiJoinHashSeqCart(pCtx); +} + +static int32_t mSemiJoinMergeCart(SMJoinMergeCtx* pCtx) { + return (NULL == pCtx->pJoin->pFPreFilter) ? mSemiJoinMergeFullCart(pCtx) : mSemiJoinMergeSeqCart(pCtx); +} + +static FORCE_INLINE int32_t mSemiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { + return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); +} + + +SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) { + SMJoinOperatorInfo* pJoin = pOperator->info; + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + int32_t code = TSDB_CODE_SUCCESS; + int64_t probeTs = 0; + int64_t buildTs = 0; + SColumnInfoData* pBuildCol = NULL; + SColumnInfoData* pProbeCol = NULL; + + blockDataCleanup(pCtx->finBlk); + + if (pCtx->grpRemains) { + MJ_ERR_JRET(mSemiJoinHandleGrpRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + pCtx->grpRemains = false; + } + + do { + if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) { + break; + } + + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); + + if (probeTs == pCtx->lastEqTs) { + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + + if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { + continue; + } + + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); + } else if (MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { + mJoinSetDone(pOperator); + break; + } + + do { + if (probeTs == buildTs) { + pCtx->lastEqTs = probeTs; + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + + if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { + break; + } + + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); + continue; + } + + if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { + if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); + continue; + } + } else { + if (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); + continue; + } + } + + break; + } while (true); + } while (true); + +_return: + + if (code) { + pJoin->errCode = code; + return NULL; + } + + return pCtx->finBlk; +} + + +static FORCE_INLINE int32_t mAntiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { + if (pCtx->lastEqGrp) { + return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); + } + + return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true); +} + +static int32_t mAntiJoinHashFullCart(SMJoinMergeCtx* pCtx) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + size_t bufLen = 0; + int32_t probeEndIdx = probeGrp->endIdx; + + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) { + if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); + probeGrp->endIdx = probeEndIdx; + continue; + } + + void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); + if (NULL == pGrp) { + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); + probeGrp->endIdx = probeEndIdx; + } + } + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t mAntiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + + do { + blockDataCleanup(pCtx->midBlk); + + mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); + + if (pCtx->midBlk->info.rows > 0) { + MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pPreFilter)); + } + + if (pCtx->midBlk->info.rows) { + break; + } + + if (build->grpRowIdx < 0) { + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); + break; + } + + continue; + } while (true); + + return TSDB_CODE_SUCCESS; +} + + +static int32_t mAntiJoinHashSeqCart(SMJoinMergeCtx* pCtx) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); + size_t bufLen = 0; + int32_t probeEndIdx = probeGrp->endIdx; + + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); probeGrp->readIdx++) { + if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); + probeGrp->endIdx = probeEndIdx; + continue; + } + + void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); + if (NULL == pGrp) { + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); + probeGrp->endIdx = probeEndIdx; + continue; + } + + build->pHashCurGrp = *(SArray**)pGrp; + build->grpRowIdx = 0; + + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mAntiJoinHashGrpCartFilter(pCtx, probeGrp)); + probeGrp->endIdx = probeEndIdx; + } + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; +} + +static int32_t mAntiJoinMergeFullCart(SMJoinMergeCtx* pCtx) { + return TSDB_CODE_SUCCESS; +} + +static int32_t mAntiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + SMJoinGrpRows* buildGrp = NULL; + int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); + int32_t probeEndIdx = probeGrp->endIdx; + int32_t rowsLeft = pCtx->midBlk->info.capacity; + + do { + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); + ++probeGrp->readIdx, probeGrp->endIdx = probeEndIdx, build->grpIdx = 0) { + probeGrp->endIdx = probeGrp->readIdx; + + rowsLeft = pCtx->midBlk->info.capacity; + + blockDataCleanup(pCtx->midBlk); + for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) { + buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); + rowsLeft -= GRP_REMAIN_ROWS(buildGrp); + buildGrp->readIdx = buildGrp->beginIdx; + continue; + } + + int32_t buildEndIdx = buildGrp->endIdx; + buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; + ASSERT(buildGrp->endIdx >= buildGrp->readIdx); + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); + buildGrp->readIdx += rowsLeft; + buildGrp->endIdx = buildEndIdx; + rowsLeft = 0; + break; + } + + if (pCtx->midBlk->info.rows > 0) { + MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pFPreFilter)); + } + + if (pCtx->midBlk->info.rows > 0) { + if (build->grpIdx < buildGrpNum) { + buildGrp->readIdx = buildGrp->beginIdx; + } + + continue; + } + + if (build->grpIdx >= buildGrpNum) { + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); + continue; + } + + //need break + + probeGrp->endIdx = probeEndIdx; + break; + } + + if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) { + break; + } + } while (true); + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t mAntiJoinHashCart(SMJoinMergeCtx* pCtx) { + return (NULL == pCtx->pJoin->pPreFilter) ? mAntiJoinHashFullCart(pCtx) : mAntiJoinHashSeqCart(pCtx); +} + +static int32_t mAntiJoinMergeCart(SMJoinMergeCtx* pCtx) { + return (NULL == pCtx->pJoin->pFPreFilter) ? mAntiJoinMergeFullCart(pCtx) : mAntiJoinMergeSeqCart(pCtx); +} + +SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { + SMJoinOperatorInfo* pJoin = pOperator->info; + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + int32_t code = TSDB_CODE_SUCCESS; + int64_t probeTs = 0; + int64_t buildTs = 0; + SColumnInfoData* pBuildCol = NULL; + SColumnInfoData* pProbeCol = NULL; + + blockDataCleanup(pCtx->finBlk); + + if (pCtx->grpRemains) { + MJ_ERR_JRET(mAntiJoinHandleGrpRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + pCtx->grpRemains = false; + } + + do { + if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) { + break; + } + + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); + + if (probeTs == pCtx->lastEqTs) { + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + + if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { + continue; + } else { + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); + } + } + + while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { + if (probeTs == buildTs) { + pCtx->lastEqTs = probeTs; + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); + } else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { + MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + } else { + while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); + if (PROBE_TS_OVER(pCtx->ascTs, probeTs, buildTs)) { + continue; + } + + break; + } + } + } + + if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) { + pCtx->probeNEqGrp.blk = pJoin->probe->blk; + pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx; + pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; + pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1; + + pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; + + MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + } + } while (true); + +_return: + + if (code) { + pJoin->errCode = code; + return NULL; + } + + return pCtx->finBlk; +} + +int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { + return TSDB_CODE_SUCCESS; +} + int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; @@ -1151,7 +1737,7 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity); } - pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.5; + pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.9; switch (pJoin->joinType) { case JOIN_TYPE_INNER: @@ -1159,10 +1745,25 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ pCtx->mergeCartFp = (joinCartFp)mInnerJoinMergeCart; break; case JOIN_TYPE_LEFT: - case JOIN_TYPE_RIGHT: - pCtx->hashCartFp = (joinCartFp)mLeftJoinHashCart; - pCtx->mergeCartFp = (joinCartFp)mLeftJoinMergeCart; + case JOIN_TYPE_RIGHT: { + switch (pJoin->subType) { + case JOIN_STYPE_OUTER: + pCtx->hashCartFp = (joinCartFp)mLeftJoinHashCart; + pCtx->mergeCartFp = (joinCartFp)mLeftJoinMergeCart; + break; + case JOIN_STYPE_SEMI: + pCtx->hashCartFp = (joinCartFp)mSemiJoinHashCart; + pCtx->mergeCartFp = (joinCartFp)mSemiJoinMergeCart; + break; + case JOIN_STYPE_ANTI: + pCtx->hashCartFp = (joinCartFp)mAntiJoinHashCart; + pCtx->mergeCartFp = (joinCartFp)mAntiJoinMergeCart; + break; + default: + break; + } break; + } case JOIN_TYPE_FULL: pCtx->hashCartFp = (joinCartFp)mFullJoinHashCart; pCtx->mergeCartFp = (joinCartFp)mFullJoinMergeCart; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 4b5cca0ad7..5a67018a74 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -28,6 +28,155 @@ #include "mergejoin.h" +void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) { + // int32_t totalRows = pBlock->info.rows; + int32_t bmLen = BitmapLen(totalRows); + char* pBitmap = NULL; + int32_t maxRows = 0; + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); + // it is a reserved column for scalar function, and no data in this column yet. + if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) { + continue; + } + + int32_t numOfRows = 0; + if (IS_VAR_DATA_TYPE(pDst->info.type)) { + int32_t j = 0; + pDst->varmeta.length = 0; + + while (j < totalRows) { + if (pBoolList[j] == 0) { + j += 1; + continue; + } + + if (colDataIsNull_var(pDst, j)) { + colDataSetNull_var(pDst, numOfRows); + } else { + // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first + // copy it to p2 + char* p1 = colDataGetVarData(pDst, j); + int32_t len = 0; + if (pDst->info.type == TSDB_DATA_TYPE_JSON) { + len = getJsonValueLen(p1); + } else { + len = varDataTLen(p1); + } + char* p2 = taosMemoryMalloc(len); + memcpy(p2, p1, len); + colDataSetVal(pDst, numOfRows, p2, false); + taosMemoryFree(p2); + } + numOfRows += 1; + j += 1; + break; + } + + if (maxRows < numOfRows) { + maxRows = numOfRows; + } + } else { + if (pBitmap == NULL) { + pBitmap = taosMemoryCalloc(1, bmLen); + } + + memcpy(pBitmap, pDst->nullbitmap, bmLen); + memset(pDst->nullbitmap, 0, bmLen); + + int32_t j = 0; + + switch (pDst->info.type) { + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + case TSDB_DATA_TYPE_DOUBLE: + case TSDB_DATA_TYPE_TIMESTAMP: + while (j < totalRows) { + if (pBoolList[j] == 0) { + j += 1; + continue; + } + + if (colDataIsNull_f(pBitmap, j)) { + colDataSetNull_f(pDst->nullbitmap, numOfRows); + } else { + ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j]; + } + numOfRows += 1; + j += 1; + break; + } + break; + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + while (j < totalRows) { + if (pBoolList[j] == 0) { + j += 1; + continue; + } + if (colDataIsNull_f(pBitmap, j)) { + colDataSetNull_f(pDst->nullbitmap, numOfRows); + } else { + ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j]; + } + numOfRows += 1; + j += 1; + break; + } + break; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + while (j < totalRows) { + if (pBoolList[j] == 0) { + j += 1; + continue; + } + if (colDataIsNull_f(pBitmap, j)) { + colDataSetNull_f(pDst->nullbitmap, numOfRows); + } else { + ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j]; + } + numOfRows += 1; + j += 1; + break; + } + break; + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: + while (j < totalRows) { + if (pBoolList[j] == 0) { + j += 1; + continue; + } + if (colDataIsNull_f(pBitmap, j)) { + colDataSetNull_f(pDst->nullbitmap, numOfRows); + } else { + ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j]; + } + numOfRows += 1; + j += 1; + break; + } + break; + } + } + + if (maxRows < numOfRows) { + maxRows = numOfRows; + } + } + + pBlock->info.rows = maxRows; + if (pBitmap != NULL) { + taosMemoryFree(pBitmap); + } +} + + int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx) { if (pFilterInfo == NULL || pBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; @@ -138,6 +287,71 @@ _err: return code; } +int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) { + if (pFilterInfo == NULL || pBlock->info.rows == 0) { + return TSDB_CODE_SUCCESS; + } + + SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; + SColumnInfoData* p = NULL; + + int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + int32_t status = 0; + code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + if (status == FILTER_RESULT_ALL_QUALIFIED) { + pBlock->info.rows = 1; + } else if (status == FILTER_RESULT_NONE_QUALIFIED) { + pBlock->info.rows = 0; + } else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) { + mJoinTrimKeepOneRow(pBlock, pBlock->info.rows, (bool*)p->pData); + } + + code = TSDB_CODE_SUCCESS; + +_err: + colDataDestroy(p); + taosMemoryFree(p); + return code; +} + +int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) { + if (pFilterInfo == NULL || pBlock->info.rows == 0) { + return TSDB_CODE_SUCCESS; + } + + SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; + SColumnInfoData* p = NULL; + + int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + int32_t status = 0; + code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + if (status == FILTER_RESULT_NONE_QUALIFIED) { + pBlock->info.rows = 0; + } + + code = TSDB_CODE_SUCCESS; + +_err: + colDataDestroy(p); + taosMemoryFree(p); + return code; +} int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) { @@ -545,6 +759,12 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi return TSDB_CODE_OUT_OF_MEMORY; } } + + pTable->noKeepEqGrpRows = (JOIN_STYPE_ANTI == pJoin->subType && NULL == pJoin->pFPreFilter); + pTable->multiEqGrpRows = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pFPreFilter); + pTable->multiRowsGrp = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pPreFilter); + } else { + pTable->multiEqGrpRows = true; } return TSDB_CODE_SUCCESS; @@ -586,16 +806,11 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin } static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { -#if 0 - pJoin->joinFps = &gMJoinFps[pJoin->joinType][pJoin->subType]; - - int32_t code = (*pJoin->joinFps->initJoinCtx)(pOperator, pJoin); - if (code) { - return code; + if (JOIN_STYPE_ASOF == pJoin->subType || JOIN_STYPE_WIN == pJoin->subType) { + //return mJoinInitWindowCtx(pJoin, pJoinNode); } -#else + return mJoinInitMergeCtx(pJoin, pJoinNode); -#endif } void mJoinSetDone(SOperatorInfo* pOperator) { @@ -675,7 +890,7 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); SMJoinGrpRows* pGrp = NULL; - if (*(int64_t*)colDataGetData(pCol, pTable->blkRowIdx) != timestamp) { + if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { return TSDB_CODE_SUCCESS; } @@ -690,22 +905,46 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol pGrp->endIdx = pGrp->beginIdx; pGrp->readMatch = false; pGrp->blk = pTable->blk; - - for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { - char* pNextVal = colDataGetData(pCol, pTable->blkRowIdx); - if (timestamp == *(int64_t*)pNextVal) { - pGrp->endIdx++; - continue; - } - goto _return; + char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1); + if (timestamp == *(int64_t*)pEndVal) { + if (pTable->multiEqGrpRows) { + pGrp->endIdx = pTable->blk->info.rows - 1; + } else { + pGrp->endIdx = pGrp->beginIdx; + } + + pTable->blkRowIdx = pTable->blk->info.rows; + } else { + for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { + char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx); + if (timestamp == *(int64_t*)pNextVal) { + pGrp->endIdx++; + continue; + } + + if (!pTable->multiEqGrpRows) { + pGrp->endIdx = pGrp->beginIdx; + } + + goto _return; + } } - if (wholeBlk) { + if (wholeBlk && (pTable->multiEqGrpRows || restart)) { *wholeBlk = true; - if (0 == pGrp->beginIdx) { + + if (pTable->noKeepEqGrpRows) { + goto _return; + } + + if (0 == pGrp->beginIdx && pTable->multiEqGrpRows) { pGrp->blk = createOneDataBlock(pTable->blk, true); } else { + if (!pTable->multiEqGrpRows) { + pGrp->endIdx = pGrp->beginIdx; + } + pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, pGrp->endIdx - pGrp->beginIdx + 1); pGrp->endIdx -= pGrp->beginIdx; pGrp->beginIdx = 0; @@ -716,8 +955,12 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol _return: - pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1; - + if (pTable->noKeepEqGrpRows || (!pTable->multiEqGrpRows && !restart)) { + taosArrayPop(pTable->eqGrps); + } else { + pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1; + } + return TSDB_CODE_SUCCESS; } @@ -835,7 +1078,7 @@ static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDat taosArrayPush(pNewGrp, &pos); tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES); - } else { + } else if (pBuild->multiRowsGrp) { taosArrayPush(*pGrpRows, &pos); } @@ -1076,9 +1319,22 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) { pJoin->joinFp = mInnerJoinDo; break; case JOIN_TYPE_LEFT: - case JOIN_TYPE_RIGHT: - pJoin->joinFp = mLeftJoinDo; + case JOIN_TYPE_RIGHT: { + switch (pJoin->subType) { + case JOIN_STYPE_OUTER: + pJoin->joinFp = mLeftJoinDo; + break; + case JOIN_STYPE_SEMI: + pJoin->joinFp = mSemiJoinDo; + break; + case JOIN_STYPE_ANTI: + pJoin->joinFp = mAntiJoinDo; + break; + default: + break; + } break; + } case JOIN_TYPE_FULL: pJoin->joinFp = mFullJoinDo; break; diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 4e4658c39b..b4fa6609f6 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -66,13 +66,17 @@ enum { }; #define COL_DISPLAY_WIDTH 18 -#define JT_MAX_LOOP 1000 +#define JT_MAX_LOOP 3000 #define LEFT_BLK_ID 0 #define RIGHT_BLK_ID 1 #define RES_BLK_ID 2 #define MAX_SLOT_NUM 4 +#define LEFT_TABLE_COLS 0x1 +#define RIGHT_TABLE_COLS 0x2 +#define ALL_TABLE_COLS (LEFT_TABLE_COLS | RIGHT_TABLE_COLS) + #define JT_KEY_SOLT_ID (MAX_SLOT_NUM - 1) int32_t jtInputColType[MAX_SLOT_NUM] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_BIGINT}; @@ -151,6 +155,8 @@ typedef struct { SArray* rightBlkList; int64_t resRows; + bool leftColOnly; + bool rightColOnly; SSHashObj* jtResRows; SOperatorInfo* pJoinOp; @@ -191,6 +197,12 @@ void printResRow(char* value, int32_t type) { printf(" "); for (int32_t i = 0; i < jtCtx.resColNum; ++i) { int32_t slot = jtCtx.resColInSlot[i]; + if (0 == type && ((jtCtx.leftColOnly && slot >= MAX_SLOT_NUM) || + (jtCtx.rightColOnly && slot < MAX_SLOT_NUM))) { + printf("%18s", " "); + continue; + } + if (*(bool*)(value + slot)) { printf("%18s", " NULL"); continue; @@ -211,15 +223,15 @@ void printResRow(char* value, int32_t type) { printf("\t %s\n", 0 == type ? "-" : (1 == type ? "+" : "")); } -void pushResRow() { +void pushResRow(char* buf, int32_t size) { jtCtx.resRows++; - int32_t* rows = (int32_t*)tSimpleHashGet(jtCtx.jtResRows, jtCtx.resColBuf, jtCtx.resColSize); + int32_t* rows = (int32_t*)tSimpleHashGet(jtCtx.jtResRows, buf, size); if (rows) { (*rows)++; } else { int32_t n = 1; - tSimpleHashPut(jtCtx.jtResRows, jtCtx.resColBuf, jtCtx.resColSize, &n, sizeof(n)); + tSimpleHashPut(jtCtx.jtResRows, buf, size, &n, sizeof(n)); } } @@ -605,14 +617,32 @@ void createFilterStart(SSortMergeJoinPhysiNode* p, bool filter) { memset(jtCtx.rightFilterColList, 0, sizeof(jtCtx.rightFilterColList)); return; } - - jtCtx.leftFilterNum = taosRand() % (MAX_SLOT_NUM + 1); - if (0 == jtCtx.leftFilterNum) { - do { - jtCtx.rightFilterNum = taosRand() % (MAX_SLOT_NUM + 1); - } while (0 == jtCtx.rightFilterNum); - } else { + + if ((JOIN_STYPE_SEMI == jtCtx.subType || JOIN_STYPE_ANTI == jtCtx.subType) && JOIN_TYPE_LEFT == jtCtx.joinType) { + jtCtx.rightFilterNum = 0; + jtCtx.leftFilterNum = taosRand() % (MAX_SLOT_NUM + 1); + if (0 == jtCtx.leftFilterNum) { + do { + jtCtx.leftFilterNum = taosRand() % (MAX_SLOT_NUM + 1); + } while (0 == jtCtx.leftFilterNum); + } + } else if ((JOIN_STYPE_SEMI == jtCtx.subType || JOIN_STYPE_ANTI == jtCtx.subType) && JOIN_TYPE_RIGHT == jtCtx.joinType) { + jtCtx.leftFilterNum = 0; jtCtx.rightFilterNum = taosRand() % (MAX_SLOT_NUM + 1); + if (0 == jtCtx.rightFilterNum) { + do { + jtCtx.rightFilterNum = taosRand() % (MAX_SLOT_NUM + 1); + } while (0 == jtCtx.rightFilterNum); + } + } else { + jtCtx.leftFilterNum = taosRand() % (MAX_SLOT_NUM + 1); + if (0 == jtCtx.leftFilterNum) { + do { + jtCtx.rightFilterNum = taosRand() % (MAX_SLOT_NUM + 1); + } while (0 == jtCtx.rightFilterNum); + } else { + jtCtx.rightFilterNum = taosRand() % (MAX_SLOT_NUM + 1); + } } int32_t idx = 0; @@ -645,6 +675,8 @@ void createFilterEnd(SSortMergeJoinPhysiNode* p, bool filter) { SLogicConditionNode* pLogic = NULL; if ((jtCtx.leftFilterNum + jtCtx.rightFilterNum) > 1) { pLogic = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + pLogic->node.resType.type = TSDB_DATA_TYPE_BOOL; + pLogic->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; pLogic->condType = LOGIC_COND_TYPE_AND; } @@ -664,6 +696,8 @@ void createFilterEnd(SSortMergeJoinPhysiNode* p, bool filter) { SOperatorNode* pOp = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR); pOp->opType = OP_TYPE_GREATER_THAN; + pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; + pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; pOp->pLeft = (SNode*)pCol; pOp->pRight = (SNode*)pVal; @@ -692,6 +726,8 @@ void createFilterEnd(SSortMergeJoinPhysiNode* p, bool filter) { SOperatorNode* pOp = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR); pOp->opType = OP_TYPE_GREATER_THAN; + pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; + pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; pOp->pLeft = (SNode*)pCol; pOp->pRight = (SNode*)pVal; @@ -748,6 +784,8 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(EJoinType type, EJoin jtCtx.joinType = type; jtCtx.subType = sub; jtCtx.asc = asc; + jtCtx.leftColOnly = (JOIN_TYPE_LEFT == type && JOIN_STYPE_SEMI == sub); + jtCtx.rightColOnly = (JOIN_TYPE_RIGHT == type && JOIN_STYPE_SEMI == sub); createColCond(p, cond); createFilterStart(p, filter); @@ -896,7 +934,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { } } - pushResRow(); + pushResRow(jtCtx.resColBuf, jtCtx.resColSize); } (*ppBlk)->info.rows++; @@ -996,36 +1034,153 @@ void putNMatchRowToRes(char* lrow, int32_t tableOffset, int32_t peerOffset) { } } - pushResRow(); + pushResRow(jtCtx.resColBuf, jtCtx.resColSize); } -void putMatchRowToRes(char* lrow, char* rrow) { +void putMatchRowToRes(char* lrow, char* rrow, int32_t cols) { memset(jtCtx.resColBuf, 0, jtCtx.resColSize); - - for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { - if (jtCtx.resColList[c]) { - if (*(bool*)(lrow + c)) { - *(bool*)(jtCtx.resColBuf + c) = true; - } else { - memcpy(jtCtx.resColBuf + jtCtx.resColOffset[c], lrow + jtCtx.colRowOffset[c], tDataTypes[jtInputColType[c]].bytes); + + if (cols & LEFT_TABLE_COLS) { + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + if (jtCtx.resColList[c]) { + if (*(bool*)(lrow + c)) { + *(bool*)(jtCtx.resColBuf + c) = true; + } else { + memcpy(jtCtx.resColBuf + jtCtx.resColOffset[c], lrow + jtCtx.colRowOffset[c], tDataTypes[jtInputColType[c]].bytes); + } } } } - for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { - if (jtCtx.resColList[MAX_SLOT_NUM + c]) { - if (*(bool*)(rrow + c)) { - *(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = true; - } else { - memcpy(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], rrow + jtCtx.colRowOffset[c], tDataTypes[jtInputColType[c]].bytes); + if (cols & RIGHT_TABLE_COLS) { + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + if (jtCtx.resColList[MAX_SLOT_NUM + c]) { + if (*(bool*)(rrow + c)) { + *(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = true; + } else { + memcpy(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], rrow + jtCtx.colRowOffset[c], tDataTypes[jtInputColType[c]].bytes); + } } } } - pushResRow(); + pushResRow(jtCtx.resColBuf, jtCtx.resColSize); } + +void innerJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { + bool leftMatch = false, rightMatch = false, filterOut = false; + void* lValue = NULL, *rValue = NULL, *filterValue = NULL; + int64_t lBig = 0, rBig = 0, fbig = 0; + int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows; + + for (int32_t l = 0; l < leftGrpRows; ++l) { + char* lrow = jtCtx.colRowDataBuf + jtCtx.blkRowSize * l; + + filterOut = false; + leftMatch = true; + + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + lValue = lrow + jtCtx.colRowOffset[c]; + switch (jtInputColType[c]) { + case TSDB_DATA_TYPE_TIMESTAMP: + fbig = TIMESTAMP_FILTER_VALUE; + lBig = *(int64_t*)lValue; + break; + case TSDB_DATA_TYPE_INT: + fbig = INT_FILTER_VALUE; + lBig = *(int32_t*)lValue; + break; + case TSDB_DATA_TYPE_BIGINT: + fbig = BIGINT_FILTER_VALUE; + lBig = *(int64_t*)lValue; + break; + default: + break; + } + + if (jtCtx.leftFilterNum && jtCtx.leftFilterColList[c] && ((*(bool*)(lrow + c)) || lBig <= fbig)) { + filterOut = true; + break; + } + + if (jtCtx.colEqNum && jtCtx.colEqList[c] && (*(bool*)(lrow + c))) { + leftMatch = false; + break; + } + + if (jtCtx.colOnNum && jtCtx.colOnList[c] && (*(bool*)(lrow + c))) { + leftMatch = false; + break; + } + } + + if (filterOut || !leftMatch) { + continue; + } + + for (int32_t r = 0; r < rightGrpRows; ++r) { + char* rrow = jtCtx.colRowDataBuf + rightTbOffset + jtCtx.blkRowSize * r; + rightMatch = true; + filterOut = false; + + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + lValue = lrow + jtCtx.colRowOffset[c]; + + if (!*(bool*)(rrow + c)) { + rValue = rrow + jtCtx.colRowOffset[c]; + } + + switch (jtInputColType[c]) { + case TSDB_DATA_TYPE_TIMESTAMP: + fbig = TIMESTAMP_FILTER_VALUE; + lBig = *(int64_t*)lValue; + rBig = *(int64_t*)rValue; + break; + case TSDB_DATA_TYPE_INT: + fbig = INT_FILTER_VALUE; + lBig = *(int32_t*)lValue; + rBig = *(int32_t*)rValue; + break; + case TSDB_DATA_TYPE_BIGINT: + fbig = BIGINT_FILTER_VALUE; + lBig = *(int64_t*)lValue; + rBig = *(int64_t*)rValue; + break; + default: + break; + } + + if (jtCtx.colEqNum && jtCtx.colEqList[c] && ((*(bool*)(rrow + c)) || lBig != rBig)) { + rightMatch = false; + break; + } + + if (jtCtx.colOnNum && jtCtx.colOnList[c] && ((*(bool*)(rrow + c)) || lBig <= rBig)) { + rightMatch = false; + break; + } + + if (jtCtx.rightFilterNum && jtCtx.rightFilterColList[c] && ((*(bool*)(rrow + c)) || rBig <= fbig)) { + filterOut = true; + break; + } + } + + if (filterOut || !rightMatch) { + continue; + } + + putMatchRowToRes(lrow, rrow, ALL_TABLE_COLS); + } + } + + +} + + + void leftJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { bool leftMatch = false, rightMatch = false, filterOut = false; void* lValue = NULL, *rValue = NULL, *filterValue = NULL; @@ -1065,6 +1220,10 @@ void leftJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { if (jtCtx.colEqNum && jtCtx.colEqList[c] && (*(bool*)(lrow + c))) { leftMatch = false; } + + if (jtCtx.colOnNum && jtCtx.colOnList[c] && (*(bool*)(lrow + c))) { + leftMatch = false; + } } if (filterOut) { @@ -1135,7 +1294,7 @@ void leftJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { } if (rightMatch) { - putMatchRowToRes(lrow, rrow); + putMatchRowToRes(lrow, rrow, ALL_TABLE_COLS); } } @@ -1148,6 +1307,240 @@ void leftJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { } + +void semiJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { + bool leftMatch = false, rightMatch = false, filterOut = false; + void* lValue = NULL, *rValue = NULL, *filterValue = NULL; + int64_t lBig = 0, rBig = 0, fbig = 0; + int64_t leftTbOffset = 0; + int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows; + char* rrow = NULL; + + for (int32_t l = 0; l < leftGrpRows; ++l) { + char* lrow = jtCtx.colRowDataBuf + leftTbOffset + jtCtx.blkRowSize * l; + + filterOut = false; + leftMatch = true; + + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + lValue = lrow + jtCtx.colRowOffset[c]; + switch (jtInputColType[c]) { + case TSDB_DATA_TYPE_TIMESTAMP: + fbig = TIMESTAMP_FILTER_VALUE; + lBig = *(int64_t*)lValue; + break; + case TSDB_DATA_TYPE_INT: + fbig = INT_FILTER_VALUE; + lBig = *(int32_t*)lValue; + break; + case TSDB_DATA_TYPE_BIGINT: + fbig = BIGINT_FILTER_VALUE; + lBig = *(int64_t*)lValue; + break; + default: + break; + } + + if (jtCtx.leftFilterNum && jtCtx.leftFilterColList[c] && ((*(bool*)(lrow + c)) || lBig <= fbig)) { + filterOut = true; + break; + } + + if (jtCtx.colEqNum && jtCtx.colEqList[c] && (*(bool*)(lrow + c))) { + leftMatch = false; + break; + } + + if (jtCtx.colOnNum && jtCtx.colOnList[c] && (*(bool*)(lrow + c))) { + leftMatch = false; + break; + } + } + + if (filterOut || !leftMatch) { + continue; + } + + for (int32_t r = 0; r < rightGrpRows; ++r) { + rrow = jtCtx.colRowDataBuf + rightTbOffset + jtCtx.blkRowSize * r; + rightMatch = true; + filterOut = false; + + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + lValue = lrow + jtCtx.colRowOffset[c]; + + if (!*(bool*)(rrow + c)) { + rValue = rrow + jtCtx.colRowOffset[c]; + } + + switch (jtInputColType[c]) { + case TSDB_DATA_TYPE_TIMESTAMP: + fbig = TIMESTAMP_FILTER_VALUE; + lBig = *(int64_t*)lValue; + rBig = *(int64_t*)rValue; + break; + case TSDB_DATA_TYPE_INT: + fbig = INT_FILTER_VALUE; + lBig = *(int32_t*)lValue; + rBig = *(int32_t*)rValue; + break; + case TSDB_DATA_TYPE_BIGINT: + fbig = BIGINT_FILTER_VALUE; + lBig = *(int64_t*)lValue; + rBig = *(int64_t*)rValue; + break; + default: + break; + } + + if (jtCtx.colEqNum && jtCtx.colEqList[c] && ((*(bool*)(rrow + c)) || lBig != rBig)) { + rightMatch = false; + break; + } + + if (jtCtx.colOnNum && jtCtx.colOnList[c] && ((*(bool*)(rrow + c)) || lBig <= rBig)) { + rightMatch = false; + break; + } + + if (jtCtx.rightFilterNum && jtCtx.rightFilterColList[c] && ((*(bool*)(rrow + c)) || rBig <= fbig)) { + filterOut = true; + break; + } + } + + if (filterOut || !rightMatch) { + continue; + } + + break; + } + + if (!filterOut && rightMatch) { + putMatchRowToRes(lrow, rrow, LEFT_TABLE_COLS); + } + } + + +} + + + + +void antiJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { + bool leftMatch = false, rightMatch = false, filterOut = false; + void* lValue = NULL, *rValue = NULL, *filterValue = NULL; + int64_t lBig = 0, rBig = 0, fbig = 0; + int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows; + + ASSERT(0 == jtCtx.rightFilterNum); + + for (int32_t l = 0; l < leftGrpRows; ++l) { + char* lrow = jtCtx.colRowDataBuf + jtCtx.blkRowSize * l; + + filterOut = false; + leftMatch = true; + + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + lValue = lrow + jtCtx.colRowOffset[c]; + switch (jtInputColType[c]) { + case TSDB_DATA_TYPE_TIMESTAMP: + fbig = TIMESTAMP_FILTER_VALUE; + lBig = *(int64_t*)lValue; + break; + case TSDB_DATA_TYPE_INT: + fbig = INT_FILTER_VALUE; + lBig = *(int32_t*)lValue; + break; + case TSDB_DATA_TYPE_BIGINT: + fbig = BIGINT_FILTER_VALUE; + lBig = *(int64_t*)lValue; + break; + default: + break; + } + + if (jtCtx.leftFilterNum && jtCtx.leftFilterColList[c] && ((*(bool*)(lrow + c)) || lBig <= fbig)) { + filterOut = true; + break; + } + + if (jtCtx.colEqNum && jtCtx.colEqList[c] && (*(bool*)(lrow + c))) { + leftMatch = false; + } + + if (jtCtx.colOnNum && jtCtx.colOnList[c] && (*(bool*)(lrow + c))) { + leftMatch = false; + } + } + + if (filterOut) { + continue; + } + + if (false == leftMatch) { + putNMatchRowToRes(lrow, 0, MAX_SLOT_NUM); + continue; + } + + leftMatch = false; + for (int32_t r = 0; r < rightGrpRows; ++r) { + char* rrow = jtCtx.colRowDataBuf + rightTbOffset + jtCtx.blkRowSize * r; + rightMatch = true; + + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + lValue = lrow + jtCtx.colRowOffset[c]; + + if (!*(bool*)(rrow + c)) { + rValue = rrow + jtCtx.colRowOffset[c]; + } + + switch (jtInputColType[c]) { + case TSDB_DATA_TYPE_TIMESTAMP: + fbig = TIMESTAMP_FILTER_VALUE; + lBig = *(int64_t*)lValue; + rBig = *(int64_t*)rValue; + break; + case TSDB_DATA_TYPE_INT: + fbig = INT_FILTER_VALUE; + lBig = *(int32_t*)lValue; + rBig = *(int32_t*)rValue; + break; + case TSDB_DATA_TYPE_BIGINT: + fbig = BIGINT_FILTER_VALUE; + lBig = *(int64_t*)lValue; + rBig = *(int64_t*)rValue; + break; + default: + break; + } + + if (jtCtx.colEqNum && jtCtx.colEqList[c] && ((*(bool*)(rrow + c)) || lBig != rBig)) { + rightMatch = false; + break; + } + + if (jtCtx.colOnNum && jtCtx.colOnList[c] && ((*(bool*)(rrow + c)) || lBig <= rBig)) { + rightMatch = false; + break; + } + } + + if (rightMatch) { + leftMatch = true; + break; + } + } + + if (!leftMatch) { + putNMatchRowToRes(lrow, 0, MAX_SLOT_NUM); + } + } + + +} + + void fullJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { bool leftMatch = false, rightMatch = false, lfilterOut = false, rfilterOut = false; void* lValue = NULL, *rValue = NULL, *filterValue = NULL; @@ -1243,7 +1636,7 @@ void fullJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { } if (!lfilterOut && rightMatch) { - putMatchRowToRes(lrow, rrow); + putMatchRowToRes(lrow, rrow, ALL_TABLE_COLS); leftMatch= true; } } @@ -1266,9 +1659,25 @@ void fullJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { void appendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { switch (jtCtx.joinType) { - case JOIN_TYPE_LEFT: - leftJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); + case JOIN_TYPE_INNER: + innerJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); break; + case JOIN_TYPE_LEFT: { + switch (jtCtx.subType) { + case JOIN_STYPE_OUTER: + leftJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); + break; + case JOIN_STYPE_SEMI: + semiJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); + break; + case JOIN_STYPE_ANTI: + antiJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); + break; + default: + break; + } + break; + } case JOIN_TYPE_FULL: fullJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); break; @@ -1629,6 +2038,37 @@ void checkJoinDone(char* caseName) { printf("\n%dth TEST [%s] Final Result: %s\n", jtCtx.loopIdx, caseName, jtRes.succeed ? "SUCCEED" : "FAILED"); } +void putRowToResColBuf(SSDataBlock* pBlock, int32_t r, bool ignoreTbCols) { + for (int32_t c = 0; c < jtCtx.resColNum; ++c) { + int32_t slot = jtCtx.resColInSlot[c]; + if (ignoreTbCols && ((jtCtx.leftColOnly && slot >= MAX_SLOT_NUM) || + (jtCtx.rightColOnly && slot < MAX_SLOT_NUM))) { + continue; + } + + SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, c); + switch (jtInputColType[slot % MAX_SLOT_NUM]) { + case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_BIGINT: + if (colDataIsNull_s(pCol, r)) { + *(bool*)(jtCtx.resColBuf + slot) = true; + } else { + *(int64_t*)(jtCtx.resColBuf + jtCtx.resColOffset[slot]) = *(int64_t*)colDataGetData(pCol, r); + } + break; + case TSDB_DATA_TYPE_INT: + if (colDataIsNull_s(pCol, r)) { + *(bool*)(jtCtx.resColBuf + slot) = true; + } else { + *(int32_t*)(jtCtx.resColBuf + jtCtx.resColOffset[slot]) = *(int32_t*)colDataGetData(pCol, r); + } + break; + default: + break; + } + } +} + void checkJoinRes(SSDataBlock* pBlock) { jtRes.rowNum += pBlock->info.rows; if (jtRes.rowNum > jtStat.maxResRows) { @@ -1643,42 +2083,23 @@ void checkJoinRes(SSDataBlock* pBlock) { jtStat.totalResRows += pBlock->info.rows; for (int32_t r = 0; r < pBlock->info.rows; ++r) { memset(jtCtx.resColBuf, 0, jtCtx.resColSize); - - for (int32_t c = 0; c < jtCtx.resColNum; ++c) { - int32_t slot = jtCtx.resColInSlot[c]; - SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, c); - switch (jtInputColType[slot % MAX_SLOT_NUM]) { - case TSDB_DATA_TYPE_TIMESTAMP: - case TSDB_DATA_TYPE_BIGINT: - if (colDataIsNull_s(pCol, r)) { - *(bool*)(jtCtx.resColBuf + slot) = true; - } else { - *(int64_t*)(jtCtx.resColBuf + jtCtx.resColOffset[slot]) = *(int64_t*)colDataGetData(pCol, r); - } - break; - case TSDB_DATA_TYPE_INT: - if (colDataIsNull_s(pCol, r)) { - *(bool*)(jtCtx.resColBuf + slot) = true; - } else { - *(int32_t*)(jtCtx.resColBuf + jtCtx.resColOffset[slot]) = *(int32_t*)colDataGetData(pCol, r); - } - break; - default: - break; - } - } + + putRowToResColBuf(pBlock, r, true); char* value = (char*)tSimpleHashGet(jtCtx.jtResRows, jtCtx.resColBuf, jtCtx.resColSize); if (NULL == value) { + putRowToResColBuf(pBlock, r, false); printResRow(jtCtx.resColBuf, 1); jtRes.succeed = false; jtRes.addRowNum++; continue; } + rmResRow(); + + putRowToResColBuf(pBlock, r, false); printResRow(jtCtx.resColBuf, 2); jtRes.matchNum++; - rmResRow(); } } @@ -1777,7 +2198,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { bool contLoop = true; SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param->joinType, param->subType, param->cond, param->filter, param->asc); - createDummyBlkList(20000, 20000, 20000, 20000, 4096); + createDummyBlkList(20, 20, 20, 20, 3); while (contLoop) { rerunBlockedHere(); @@ -1809,6 +2230,109 @@ void handleCaseEnd() { } // namespace +#if 0 +#if 1 +TEST(innerJoin, noCondTest) { + SJoinTestParam param; + char* caseName = "innerJoin:noCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_INNER; + param.subType = JOIN_STYPE_NONE; + param.cond = TEST_NO_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + +#if 1 +TEST(innerJoin, eqCondTest) { + SJoinTestParam param; + char* caseName = "innerJoin:eqCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_INNER; + param.subType = JOIN_STYPE_NONE; + param.cond = TEST_EQ_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + +#if 1 +TEST(innerJoin, onCondTest) { + SJoinTestParam param; + char* caseName = "innerJoin:onCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_INNER; + param.subType = JOIN_STYPE_NONE; + param.cond = TEST_ON_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + +#if 1 +TEST(innerJoin, fullCondTest) { + SJoinTestParam param; + char* caseName = "innerJoin:fullCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_INNER; + param.subType = JOIN_STYPE_NONE; + param.cond = TEST_FULL_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif +#endif + + #if 0 #if 1 TEST(leftOuterJoin, noCondTest) { @@ -1911,6 +2435,7 @@ TEST(leftOuterJoin, fullCondTest) { #endif #endif +#if 0 #if 1 TEST(fullOuterJoin, noCondTest) { SJoinTestParam param; @@ -2011,6 +2536,214 @@ TEST(fullOuterJoin, fullCondTest) { taosMemoryFree(pTask); } #endif +#endif + +#if 0 +#if 1 +TEST(leftSemiJoin, noCondTest) { + SJoinTestParam param; + char* caseName = "leftSemiJoin:noCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_SEMI; + param.cond = TEST_NO_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + +#if 1 +TEST(leftSemiJoin, eqCondTest) { + SJoinTestParam param; + char* caseName = "leftSemiJoin:eqCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_SEMI; + param.cond = TEST_EQ_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); + handleCaseEnd(); +} +#endif + +#if 1 +TEST(leftSemiJoin, onCondTest) { + SJoinTestParam param; + char* caseName = "leftSemiJoin:onCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_SEMI; + param.cond = TEST_ON_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + +#if 1 +TEST(leftSemiJoin, fullCondTest) { + SJoinTestParam param; + char* caseName = "leftSemiJoin:fullCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_SEMI; + param.cond = TEST_FULL_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif +#endif + +#if 1 +#if 0 +TEST(leftAntiJoin, noCondTest) { + SJoinTestParam param; + char* caseName = "leftAntiJoin:noCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_ANTI; + param.cond = TEST_NO_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + +#if 1 +TEST(leftAntiJoin, eqCondTest) { + SJoinTestParam param; + char* caseName = "leftAntiJoin:eqCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_ANTI; + param.cond = TEST_EQ_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); + handleCaseEnd(); +} +#endif + +#if 1 +TEST(leftAntiJoin, onCondTest) { + SJoinTestParam param; + char* caseName = "leftAntiJoin:onCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_ANTI; + param.cond = TEST_ON_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + +#if 1 +TEST(leftAntiJoin, fullCondTest) { + SJoinTestParam param; + char* caseName = "leftAntiJoin:fullCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_ANTI; + param.cond = TEST_FULL_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif +#endif + int main(int argc, char** argv) {