From 5dc49a8233200319617dcc08163776fd43e73890 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 3 Jan 2024 13:52:36 +0800 Subject: [PATCH] enh:support full join --- source/libs/executor/inc/mergejoin.h | 37 ++- source/libs/executor/src/mergejoin.c | 246 ++++++++++++++----- source/libs/executor/src/mergejoinoperator.c | 205 +++++++++++++++- 3 files changed, 411 insertions(+), 77 deletions(-) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 2fc2cd3c02..33844d1816 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -97,13 +97,16 @@ typedef struct SMJoinTableCtx { int32_t grpArrayIdx; SArray* pGrpArrays; - int32_t grpRowIdx; - SArray* pHashCurGrp; - SSHashObj* pGrpHash; + int32_t grpRowIdx; + SArray* pHashCurGrp; + SMJoinHashGrpRows* pHashGrpRows; + SSHashObj* pGrpHash; - int64_t rowBitmapSize; - int64_t rowBitmapOffset; - char* pRowBitmap; + int64_t rowBitmapSize; + int64_t rowBitmapOffset; + char* pRowBitmap; + + SMJoinNMatchCtx nMatchCtx; } SMJoinTableCtx; typedef struct SMJoinGrpRows { @@ -113,9 +116,24 @@ typedef struct SMJoinGrpRows { int32_t readIdx; int32_t rowBitmapOffset; int32_t rowMatchNum; + bool allRowsMatch; bool readMatch; } SMJoinGrpRows; +typedef struct SMJoinHashGrpRows { + int32_t rowBitmapOffset; + int32_t rowMatchNum; + bool allRowsMatch; + SArray* pRows; +} SMJoinHashGrpRows; + +typedef struct SMJoinNMatchCtx { + void* pGrp; + int32_t iter; + int32_t bitIdx; + int32_t grpIdx; +} SMJoinNMatchCtx; + typedef struct SMJoinMergeCtx { struct SMJoinOperatorInfo* pJoin; bool ascTs; @@ -123,6 +141,7 @@ typedef struct SMJoinMergeCtx { bool keepOrder; bool grpRemains; bool midRemains; + bool nmatchRemains; bool lastEqGrp; bool lastProbeGrp; int32_t blkThreshold; @@ -203,6 +222,9 @@ typedef struct SMJoinOperatorInfo { #define BLK_IS_FULL(_blk) ((_blk)->info.rows == (_blk)->info.capacity) +#define MJOIN_ROW_BITMAP_SET(_b, _base, _idx) (!colDataIsNull_f((_b + _base), _idx)) +#define MJOIN_SET_ROW_BITMAP(_b, _base, _idx) (colDataClearNull_f((_b + _base), _idx)) + #define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \ do { \ @@ -246,7 +268,8 @@ void mJoinSetDone(SOperatorInfo* pOperator); bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen); int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart); int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp); -int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable); +int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable); +int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable); int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable); int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp); bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build); diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 793f7ffccd..10e8a31a58 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -53,22 +53,30 @@ static int32_t mOuterJoinHashFullCart(SMJoinMergeCtx* pCtx) { continue; } - SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); + void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); if (NULL == pGrp) { probeGrp->endIdx = probeGrp->readIdx; MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); probeGrp->endIdx = probeEndIdx; - } else { - build->pHashCurGrp = *pGrp; - build->grpRowIdx = 0; - bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); - if (!contLoop) { - if (build->grpRowIdx < 0) { - probeGrp->readIdx++; - } - goto _return; - } + continue; } + + if (build->rowBitmapSize > 0) { + build->pHashCurGrp = ((SMJoinHashGrpRows*)pGrp)->pRows; + build->pHashGrpRows = pGrp; + build->pHashGrpRows->allRowsMatch = true; + } else { + build->pHashCurGrp = *(SArray**)pGrp; + } + + build->grpRowIdx = 0; + bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + if (!contLoop) { + if (build->grpRowIdx < 0) { + probeGrp->readIdx++; + } + goto _return; + } } _return: @@ -149,6 +157,8 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { int32_t probeEndIdx = probeGrp->endIdx; int32_t rowsLeft = pCtx->midBlk->info.capacity; bool contLoop = true; + int32_t startGrpIdx = 0; + int32_t startRowIdx = -1; blockDataCleanup(pCtx->midBlk); @@ -158,8 +168,14 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { probeGrp->endIdx = probeGrp->readIdx; rowsLeft = pCtx->midBlk->info.capacity; + startGrpIdx = build->grpIdx; + startRowIdx = -1; + for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + if (startRowIdx < 0) { + startRowIdx = buildGrp->readIdx; + } if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); @@ -179,7 +195,12 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { } if (pCtx->midBlk->info.rows > 0) { - MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pFPreFilter, NULL)); + if (build->rowBitmapSize > 0) { + MJ_ERR_RET(mJoinFilterAndMarkRows(pCtx->midBlk, pCtx->pJoin->pFPreFilter, build, startGrpIdx, startRowIdx)); + } else { + MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pFPreFilter, NULL)); + } + if (pCtx->midBlk->info.rows > 0) { probeGrp->readMatch = true; } @@ -226,49 +247,21 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { return TSDB_CODE_SUCCESS; } -int32_t mJoinFilterMarkRowsMatch(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx) { - if (pFilterInfo == NULL || pBlock->info.rows == 0) { - return TSDB_CODE_SUCCESS; - } - - SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; - SColumnInfoData* p = NULL; - - int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } - - int32_t status = 0; - code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } - - extractQualifiedTupleByFilterResult(pBlock, p, status); - - code = TSDB_CODE_SUCCESS; - -_err: - colDataDestroy(p); - taosMemoryFree(p); - return code; -} - static int32_t mOuterJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); - int32_t startRowIdx = build->grpRowIdx; + int32_t startRowIdx = 0; blockDataCleanup(pCtx->midBlk); do { + startRowIdx = build->grpRowIdx; mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); if (pCtx->midBlk->info.rows > 0) { if (build->rowBitmapSize > 0) { - MJ_ERR_RET(mJoinFilterMarkRowsMatch(pCtx->midBlk, pCtx->pJoin->pPreFilter, build, startRowIdx)); + MJ_ERR_RET(mJoinFilterAndMarkHashRows(pCtx->midBlk, pCtx->pJoin->pPreFilter, build, startRowIdx)); } else { MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pPreFilter, NULL)); } @@ -339,28 +332,38 @@ static int32_t mOuterJoinHashSeqCart(SMJoinMergeCtx* pCtx) { continue; } - SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); + void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); if (NULL == pGrp) { probeGrp->endIdx = probeGrp->readIdx; MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); probeGrp->endIdx = probeEndIdx; probeGrp->readIdx++; probeGrp->readMatch = false; + continue; + } + + if (build->rowBitmapSize > 0) { + build->pHashCurGrp = ((SMJoinHashGrpRows*)pGrp)->pRows; + build->pHashGrpRows = pGrp; + if (0 == build->pHashGrpRows->rowBitmapOffset) { + MJ_ERR_RET(mJoinGetRowBitmapOffset(build, taosArrayGetSize(build->pHashCurGrp), &build->pHashGrpRows->rowBitmapOffset)); + } } else { - build->pHashCurGrp = *pGrp; - build->grpRowIdx = 0; + build->pHashCurGrp = *(SArray**)pGrp; + } + + build->grpRowIdx = 0; - probeGrp->endIdx = probeGrp->readIdx; - MJ_ERR_RET(mOuterJoinHashGrpCartFilter(pCtx, &contLoop)); - probeGrp->endIdx = probeEndIdx; - if (build->grpRowIdx < 0) { - probeGrp->readIdx++; - probeGrp->readMatch = false; - } + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mOuterJoinHashGrpCartFilter(pCtx, &contLoop)); + probeGrp->endIdx = probeEndIdx; + if (build->grpRowIdx < 0) { + probeGrp->readIdx++; + probeGrp->readMatch = false; + } - if (!contLoop) { - break; - } + if (!contLoop) { + break; } } @@ -757,6 +760,116 @@ static FORCE_INLINE int32_t mFullJoinHashCart(SMJoinMergeCtx* pCtx) { return (NULL == pCtx->pJoin->pPreFilter) ? mOuterJoinHashFullCart(pCtx) : mOuterJoinHashSeqCart(pCtx); } +static int32_t mFullJoinMergeCart(SMJoinMergeCtx* pCtx) { + return (NULL == pCtx->pJoin->pFPreFilter) ? mOuterJoinMergeFullCart(pCtx) : mOuterJoinMergeSeqCart(pCtx); +} + +const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 4, 32, 0, 4, 1, 2}; + +static FORCE_INLINE int32_t mFullJoinOutputSingleRow(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, int32_t idx) { + SMJoinGrpRows grp = {0}; + SMJoinRowPos* pPos = taosArrayGet(pGrpRows->pRows, idx); + grp.blk = pPos->pBlk; + grp.readIdx = pPos->pos; + grp.endIdx = pPos->pos; + return mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, &grp, false); +} + +static int32_t mFullJoinOutputAllRows(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows) { + int32_t rowNum = taosArrayGetSize(pGrpRows->pRows); + for (int32_t i = 0; i < rowNum && !BLK_IS_FULL(pCtx->finBlk); ++i) { + MJ_ERR_RET(mFullJoinOutputSingleRow(pCtx, pGrpRows, i)); + } + return TSDB_CODE_SUCCESS; +} + + +static int32_t mFullJoinHandleBuildGrpRemains(SMJoinMergeCtx* pCtx) { + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinNMatchCtx* pNMatch = &build->nMatchCtx; + if (pCtx->hashJoin) { + while (NULL != (pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter))) { + SMJoinHashGrpRows* pGrpRows = (SMJoinHashGrpRows*)pNMatch->pGrp; + if (pGrpRows->allRowsMatch) { + continue; + } + + if (pGrpRows->rowMatchNum <= 0) { + MJ_ERR_RET(mFullJoinOutputAllRows(pCtx, pGrpRows)); + continue; + } + + int32_t bitBytes = BitmapLen(taosArrayGetSize(pGrpRows->pRows)); + for (; pNMatch->bitIdx < bitBytes; ++pNMatch->bitIdx) { + if (0 == build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]) { + continue; + } + + char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]; + while (*v && !BLK_IS_FULL(pCtx->finBlk)) { + uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11]; + MJ_ERR_RET(mFullJoinOutputSingleRow(pCtx, pGrpRows, 8 * pNMatch->bitIdx + n)); + MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n); + if (++pGrpRows->rowMatchNum == taosArrayGetSize(pGrpRows->pRows)) { + pGrpRows->allRowsMatch = true; + pNMatch->bitIdx = bitBytes; + break; + } + } + + if (BLK_IS_FULL(pCtx->finBlk)) { + pCtx->nmatchRemains = true; + return TSDB_CODE_SUCCESS; + } + } + } + + pCtx->nmatchRemains = false; + pCtx->lastEqGrp = false; + return TSDB_CODE_SUCCESS; + } + + int32_t grpNum = taosArrayGetSize(build->eqGrps); + for (; pNMatch->grpIdx < grpNum; ++pNMatch->grpIdx) { + SMJoinGrpRows* pGrpRows = taosArrayGet(build->eqGrps, pNMatch->grpIdx); + if (pGrpRows->allRowsMatch) { + continue; + } + + if (pGrpRows->rowMatchNum <= 0) { + MJ_ERR_RET(mFullJoinOutputAllRows(pCtx, pGrpRows)); + continue; + } + + int32_t bitBytes = BitmapLen(pGrpRows->endIdx - pGrpRows->beginIdx + 1); + for (; pNMatch->bitIdx < bitBytes; ++pNMatch->bitIdx) { + if (0 == build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]) { + continue; + } + + char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]; + while (*v && !BLK_IS_FULL(pCtx->finBlk)) { + uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11]; + SMJoinGrpRows grp = {0}; + grp.blk = pGrpRows->blk; + grp.readIdx = 8 * pNMatch->bitIdx + n; + grp.endIdx = grp.readIdx; + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, &grp, false)); + colDataSetNull_f(build->pRowBitmap + pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n); + } + } + + if (BLK_IS_FULL(pCtx->finBlk)) { + pCtx->nmatchRemains = true; + return TSDB_CODE_SUCCESS; + } + } + + pCtx->nmatchRemains = false; + pCtx->lastEqGrp = false; + return TSDB_CODE_SUCCESS; +} + SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; @@ -784,8 +897,15 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { pCtx->grpRemains = false; } + if (pCtx->nmatchRemains) { + MJ_ERR_JRET(mFullJoinHandleBuildGrpRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + } + do { - if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) { + if (!mFullJoinRetrieve(pOperator, pJoin, pCtx)) { break; } @@ -803,6 +923,11 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { } else { MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); } + } else if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { + MJ_ERR_JRET(mFullJoinHandleBuildGrpRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } } while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { @@ -819,6 +944,13 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { continue; } + if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { + MJ_ERR_JRET(mFullJoinHandleBuildGrpRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + } + if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { MJ_ERR_JRET(mJoinProcessNonEqualGrp(pCtx, pProbeCol, true, &probeTs, &buildTs)); } else { diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 67a10eedaa..262104ca32 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -27,6 +27,119 @@ #include "ttypes.h" #include "mergejoin.h" + +int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx) { + if (pFilterInfo == NULL || pBlock->info.rows == 0) { + return TSDB_CODE_SUCCESS; + } + + SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; + SColumnInfoData* p = NULL; + + int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + int32_t status = 0; + code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + if (!build->pHashGrpRows->allRowsMatch && (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED)) { + if (status == FILTER_RESULT_ALL_QUALIFIED && taosArrayGetSize(build->pHashCurGrp) == pBlock.info.rows) { + build->pHashGrpRows->allRowsMatch = true; + } else { + bool* pRes = (bool*)p->pData; + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *pRes) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i)) { + continue; + } + + MJOIN_SET_ROW_BITMAP(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i); + build->pHashGrpRows->rowMatchNum++; + } + + if (build->pHashGrpRows->rowMatchNum == taosArrayGetSize(build->pHashGrpRows->pRows)) { + build->pHashGrpRows->allRowsMatch = true; + } + } + } + + extractQualifiedTupleByFilterResult(pBlock, p, status); + + code = TSDB_CODE_SUCCESS; + +_err: + colDataDestroy(p); + taosMemoryFree(p); + return code; +} + +int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx) { + if (pFilterInfo == NULL || pBlock->info.rows == 0) { + return TSDB_CODE_SUCCESS; + } + + SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; + SColumnInfoData* p = NULL; + + int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + int32_t status = 0; + code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + int32_t rowNum = 0; + bool* pRes = (bool*)p->pData; + int32_t grpNum = taosArrayGetSize(build->eqGrps); + if (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED) { + for (int32_t i = startGrpIdx; i < grpNum && rowNum < pBlock->info.rows; startRowIdx = 0, ++i) { + SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, i); + if (buildGrp->allRowsMatch) { + rowNum += buildGrp->endIdx - startRowIdx + 1; + continue; + } + + if (status == FILTER_RESULT_ALL_QUALIFIED && startRowIdx == buildGrp->beginIdx && ((pBlock->info.rows - rowNum) >= (buildGrp->endIdx - startRowIdx + 1))) { + buildGrp->allRowsMatch = true; + rowNum += buildGrp->endIdx - startRowIdx + 1; + continue; + } + + for (int32_t m = startRowIdx; m < buildGrp->endIdx && rowNum < pBlock->info.rows; ++m, ++rowNum) { + if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *pRes) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx)) { + continue; + } + + MJOIN_SET_ROW_BITMAP(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx); + buildGrp->rowMatchNum++; + } + + if (buildGrp->rowMatchNum == (buildGrp->endIdx - buildGrp->beginIdx + 1))) { + buildGrp->allRowsMatch = true; + } + } + } + + extractQualifiedTupleByFilterResult(pBlock, p, status); + + code = TSDB_CODE_SUCCESS; + +_err: + colDataDestroy(p); + taosMemoryFree(p); + return code; +} + + + int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) { SSDataBlock* pLess = NULL; SSDataBlock* pMore = NULL; @@ -209,6 +322,17 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, S return true; } +int32_t mJoinAllocGrpRowBitmap(SMJoinTableCtx* pTb) { + int32_t grpNum = taosArrayGetSize(pTb); + for (int32_t i = 0; i < grpNum; ++i) { + SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGet(pTb->eqGrps, i); + MJ_ERR_RET(mJoinGetRowBitmapOffset(pTb, pGrp->endIdx - pGrp->beginIdx + 1, &pGrp->rowBitmapOffset)); + pGrp->rowMatchNum = 0; + } + + return TSDB_CODE_SUCCESS; +} + int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { SMJoinOperatorInfo* pJoin = pCtx->pJoin; @@ -224,7 +348,11 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) { if (!lastBuildGrp || !pCtx->hashJoin) { - MJ_ERR_RET(mJoinMakeBuildTbHash(pJoin, pJoin->build)); + if (pJoin->build->rowBitmapSize > 0) { + MJ_ERR_RET(mJoinCreateFullBuildTbHash(pJoin, pJoin->build)); + } else { + MJ_ERR_RET(mJoinCreateBuildTbHash(pJoin, pJoin->build)); + } } if (pJoin->probe->newBlk) { @@ -238,6 +366,10 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB } pCtx->hashJoin = false; + + if (pJoin->build->rowBitmapSize > 0) { + mJoinAllocGrpRowBitmap(pJoin->build); + } return (*pCtx->mergeCartFp)(pCtx); } @@ -394,7 +526,7 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi return TSDB_CODE_OUT_OF_MEMORY; } - if (pJoin->pPreFilter && IS_FULL_OUTER_JOIN(pJoin->joinType, pJoin->subType)) { + if (pJoin->pFPreFilter && IS_FULL_OUTER_JOIN(pJoin->joinType, pJoin->subType)) { pTable->rowBitmapSize = MJOIN_ROW_BITMAP_SIZE; pTable->pRowBitmap = taosMemoryMalloc(pTable->rowBitmapSize); if (NULL == pTable->pRowBitmap) { @@ -513,7 +645,7 @@ static int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, i pTable->rowBitmapSize = newSize; } - memset(pTable->pRowBitmap + pTable->rowBitmapOffset, 0, bitmapLen); + memset(pTable->pRowBitmap + pTable->rowBitmapOffset, 0xFFFFFFFF, bitmapLen); *rowBitmapOffset = pTable->rowBitmapOffset; pTable->rowBitmapOffset += bitmapLen; @@ -521,6 +653,17 @@ static int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, i return TSDB_CODE_SUCCESS; } +void mJoinResetForBuildTable(SMJoinTableCtx* pTable) { + pTable->grpTotalRows = 0; + pTable->grpIdx = 0; + mJoinDestroyCreatedBlks(pTable->createdBlks); + taosArrayClear(pTable->eqGrps); + if (pTable->rowBitmapSize > 0) { + pTable->rowBitmapOffset = 1; + memset(&pTable->nMatchCtx, 0, sizeof(pTable->nMatchCtx)); + } +} + int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) { SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); SMJoinGrpRows* pGrp = NULL; @@ -530,10 +673,7 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol } if (restart) { - pTable->grpTotalRows = 0; - pTable->grpIdx = 0; - mJoinDestroyCreatedBlks(pTable->createdBlks); - taosArrayClear(pTable->eqGrps); + mJoinResetForBuildTable(pTable); } pGrp = taosArrayReserve(pTable->eqGrps, 1); @@ -569,11 +709,6 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol _return: - if (wholeBlk && pTable->rowBitmapSize > 0) { - MJ_ERR_RET(mJoinGetRowBitmapOffset(pTable, pGrp->endIdx - pGrp->beginIdx + 1, &pGrp->rowBitmapOffset)); - pGrp->rowMatchNum = 0; - } - pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1; return TSDB_CODE_SUCCESS; @@ -701,7 +836,50 @@ static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDat } -int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) { +static int32_t mJoinAddRowToFullHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) { + SMJoinTableCtx* pBuild = pJoin->build; + SMJoinRowPos pos = {pBlock, rowIdx}; + SMJoinHashGrpRows* pGrpRows = (SMJoinHashGrpRows*)tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen); + if (!pGrpRows) { + SMJoinHashGrpRows pNewGrp = {0}; + MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp.pRows)); + + taosArrayPush(pNewGrp.pRows, &pos); + tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, sizeof(pNewGrp)); + } else { + taosArrayPush(pGrpRows->pRows, &pos); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) { + size_t bufLen = 0; + + tSimpleHashClear(pJoin->build->pGrpHash); + pJoin->build->grpArrayIdx = 0; + + pJoin->build->grpRowIdx = -1; + + int32_t grpNum = taosArrayGetSize(pTable->eqGrps); + for (int32_t g = 0; g < grpNum; ++g) { + SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g); + MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable)); + + int32_t grpRows = GRP_REMAIN_ROWS(pGrp); + for (int32_t r = 0; r < grpRows; ++r) { + if (mJoinCopyKeyColsDataToBuf(pTable, pGrp->beginIdx + r, &bufLen)) { + continue; + } + + MJ_ERR_RET(mJoinAddRowToFullHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r)); + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) { size_t bufLen = 0; tSimpleHashClear(pJoin->build->pGrpHash); @@ -727,6 +905,7 @@ int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) return TSDB_CODE_SUCCESS; } + void mJoinResetTableCtx(SMJoinTableCtx* pCtx) { pCtx->dsInitDone = false; pCtx->dsFetchDone = false;