diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 9597deaecd..ae3893b965 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -19,9 +19,9 @@ extern "C" { #endif -#define MJOIN_DEFAULT_BLK_ROWS_NUM 4096 -#define MJOIN_HJOIN_CART_THRESHOLD 16 -#define MJOIN_BLK_SIZE_LIMIT 10485760 +#define MJOIN_DEFAULT_BLK_ROWS_NUM 2 //4096 +#define MJOIN_HJOIN_CART_THRESHOLD 1024 //16 +#define MJOIN_BLK_SIZE_LIMIT 0 //10485760 #define MJOIN_ROW_BITMAP_SIZE (2 * 1048576) struct SMJoinOperatorInfo; @@ -203,7 +203,7 @@ typedef struct SMJoinOperatorInfo { SFilterInfo* pFPreFilter; SFilterInfo* pPreFilter; SFilterInfo* pFinFilter; -// SMJoinFuncs* joinFps; + joinImplFp joinFp; SMJoinCtx ctx; SMJoinExecInfo execInfo; } SMJoinOperatorInfo; @@ -223,6 +223,7 @@ typedef struct SMJoinOperatorInfo { #define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx) #define MJOIN_PROBE_TB_ROWS_DONE(_tb) ((_tb)->blkRowIdx >= (_tb)->blk->info.rows) +#define FJOIN_PROBE_TB_ROWS_DONE(_tb) ((NULL == (_tb)->blk) || ((_tb)->blkRowIdx >= (_tb)->blk->info.rows)) #define MJOIN_BUILD_TB_ROWS_DONE(_tb) ((NULL == (_tb)->blk) || ((_tb)->blkRowIdx >= (_tb)->blk->info.rows)) #define BLK_IS_FULL(_blk) ((_blk)->info.rows == (_blk)->info.capacity) @@ -237,7 +238,7 @@ typedef struct SMJoinOperatorInfo { (_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \ (_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \ } else { \ - (_ts) = INT64_MIN; \ + (_ts) = INT64_MAX; \ } \ } while (0) @@ -267,7 +268,9 @@ typedef struct SMJoinOperatorInfo { int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode); +SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator); +SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator); bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb); void mJoinSetDone(SOperatorInfo* pOperator); bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen); @@ -286,7 +289,8 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx); int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx); int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset); -int32_t mJoinProcessNonEqualGrp(SMJoinMergeCtx* pCtx, SColumnInfoData* pCol, bool probeGrp, int64_t* probeTs, int64_t* buildTs); +int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs); +int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs); #ifdef __cplusplus } diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index a867c167b9..9ff2408c29 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -481,7 +481,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); } else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { - MJ_ERR_JRET(mJoinProcessNonEqualGrp(pCtx, pProbeCol, true, &probeTs, &buildTs)); + MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -742,14 +742,9 @@ static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); - bool buildGot = false; - - if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { - buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); - } + bool buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); if (!probeGot && !buildGot) { - mJoinSetDone(pOperator); return false; } @@ -764,7 +759,7 @@ static int32_t mFullJoinMergeCart(SMJoinMergeCtx* pCtx) { return (NULL == pCtx->pJoin->pFPreFilter) ? mOuterJoinMergeFullCart(pCtx) : mOuterJoinMergeSeqCart(pCtx); } -const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 4, 32, 0, 4, 1, 2}; +const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2}; static FORCE_INLINE int32_t mFullJoinOutputHashRow(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, int32_t idx) { SMJoinGrpRows grp = {0}; @@ -775,13 +770,14 @@ static FORCE_INLINE int32_t mFullJoinOutputHashRow(SMJoinMergeCtx* pCtx, SMJoinH return mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, &grp, false); } -static int32_t mFullJoinOutputHashGrpRows(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch) { - int32_t rowNum = taosArrayGetSize(pGrpRows->pRows); +static int32_t mFullJoinOutputHashGrpRows(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch, bool* grpDone) { + int32_t rowNum = taosArrayGetSize(pGrpRows->pRows); for (; pNMatch->rowIdx < rowNum && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) { MJ_ERR_RET(mFullJoinOutputHashRow(pCtx, pGrpRows, pNMatch->rowIdx)); } if (pNMatch->rowIdx >= rowNum) { + *grpDone = true; pNMatch->rowIdx = 0; } @@ -807,9 +803,15 @@ static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) { if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) { pGrpRows->allRowsNMatch = true; - - MJ_ERR_RET(mFullJoinOutputHashGrpRows(pCtx, pGrpRows, pNMatch)); + + bool grpDone = false; + MJ_ERR_RET(mFullJoinOutputHashGrpRows(pCtx, pGrpRows, pNMatch, &grpDone)); if (BLK_IS_FULL(pCtx->finBlk)) { + if (grpDone) { + pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter); + pNMatch->bitIdx = 0; + } + pCtx->nmatchRemains = true; return TSDB_CODE_SUCCESS; } @@ -818,8 +820,9 @@ static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) { pNMatch->bitIdx = 0; continue; } - - int32_t bitBytes = BitmapLen(taosArrayGetSize(pGrpRows->pRows)); + + int32_t grpRowNum = taosArrayGetSize(pGrpRows->pRows); + int32_t bitBytes = BitmapLen(grpRowNum); for (; pNMatch->bitIdx < bitBytes; ++pNMatch->bitIdx) { if (0 == build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]) { continue; @@ -829,6 +832,11 @@ static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) { char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]; while (*v && !BLK_IS_FULL(pCtx->finBlk)) { uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11]; + if (baseIdx + n >= grpRowNum) { + MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n); + continue; + } + MJ_ERR_RET(mFullJoinOutputHashRow(pCtx, pGrpRows, baseIdx + n)); MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n); if (++pGrpRows->rowMatchNum == taosArrayGetSize(pGrpRows->pRows)) { @@ -839,6 +847,11 @@ static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) { } if (BLK_IS_FULL(pCtx->finBlk)) { + if (pNMatch->bitIdx == bitBytes) { + pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter); + pNMatch->bitIdx = 0; + } + pCtx->nmatchRemains = true; return TSDB_CODE_SUCCESS; } @@ -863,12 +876,13 @@ static FORCE_INLINE int32_t mFullJoinOutputMergeRow(SMJoinMergeCtx* pCtx, SMJoin } -static int32_t mFullJoinOutputMergeGrpRows(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch) { - for (; pNMatch->rowIdx < pGrpRows->endIdx && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) { +static int32_t mFullJoinOutputMergeGrpRows(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch, bool* grpDone) { + for (; pNMatch->rowIdx <= pGrpRows->endIdx && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) { MJ_ERR_RET(mFullJoinOutputMergeRow(pCtx, pGrpRows, pNMatch->rowIdx)); } - if (pNMatch->rowIdx >= pGrpRows->endIdx) { + if (pNMatch->rowIdx > pGrpRows->endIdx) { + *grpDone = true; pNMatch->rowIdx = 0; } @@ -879,11 +893,13 @@ static int32_t mFullJoinOutputMergeGrpRows(SMJoinMergeCtx* pCtx, SMJoinGrpRows* static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinNMatchCtx* pNMatch = &build->nMatchCtx; - + bool grpDone = false; int32_t baseIdx = 0; int32_t rowNum = 0; int32_t grpNum = taosArrayGetSize(build->eqGrps); - for (; pNMatch->grpIdx < grpNum; ++pNMatch->grpIdx) { + for (; pNMatch->grpIdx < grpNum; ++pNMatch->grpIdx, pNMatch->bitIdx = 0) { + grpDone = false; + SMJoinGrpRows* pGrpRows = taosArrayGet(build->eqGrps, pNMatch->grpIdx); if (pGrpRows->allRowsMatch) { continue; @@ -892,9 +908,14 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) { if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) { pGrpRows->allRowsNMatch = true; - MJ_ERR_RET(mFullJoinOutputMergeGrpRows(pCtx, pGrpRows, pNMatch)); + MJ_ERR_RET(mFullJoinOutputMergeGrpRows(pCtx, pGrpRows, pNMatch, &grpDone)); if (BLK_IS_FULL(pCtx->finBlk)) { + if (grpDone) { + ++pNMatch->grpIdx; + pNMatch->bitIdx = 0; + } + pCtx->nmatchRemains = true; return TSDB_CODE_SUCCESS; } @@ -913,6 +934,12 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) { char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]; while (*v && !BLK_IS_FULL(pCtx->finBlk)) { uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11]; + if (baseIdx + n > pGrpRows->endIdx) { + MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n); + continue; + } + + ASSERT(baseIdx + n <= pGrpRows->endIdx); MJ_ERR_RET(mFullJoinOutputMergeRow(pCtx, pGrpRows, baseIdx + n)); MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n); @@ -925,6 +952,11 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) { } if (BLK_IS_FULL(pCtx->finBlk)) { + if (pNMatch->bitIdx == bitBytes) { + ++pNMatch->grpIdx; + pNMatch->bitIdx = 0; + } + pCtx->nmatchRemains = true; return TSDB_CODE_SUCCESS; } @@ -976,6 +1008,14 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { do { if (!mFullJoinRetrieve(pOperator, pJoin, pCtx)) { + if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { + MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + } + + mJoinSetDone(pOperator); break; } @@ -988,7 +1028,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { return pCtx->finBlk; } - if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { + if (FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { continue; } else { MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); @@ -1000,7 +1040,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { } } - while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { + while (!FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { if (probeTs == buildTs) { pCtx->lastEqTs = probeTs; MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); @@ -1022,9 +1062,9 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { } if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { - MJ_ERR_JRET(mJoinProcessNonEqualGrp(pCtx, pProbeCol, true, &probeTs, &buildTs)); + MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); } else { - MJ_ERR_JRET(mJoinProcessNonEqualGrp(pCtx, pBuildCol, false, &probeTs, &buildTs)); + MJ_ERR_JRET(mJoinProcessOverGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs)); } if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { @@ -1032,7 +1072,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { } } - if (pJoin->build->dsFetchDone && !MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { + if (pJoin->build->dsFetchDone && !FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { pCtx->probeNEqGrp.blk = pJoin->probe->blk; pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx; pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; @@ -1046,7 +1086,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { } } - if (pJoin->probe->dsFetchDone && !MJOIN_PROBE_TB_ROWS_DONE(pJoin->build)) { + if (pJoin->probe->dsFetchDone && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { pCtx->buildNEqGrp.blk = pJoin->build->blk; pCtx->buildNEqGrp.beginIdx = pJoin->build->blkRowIdx; pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index a9ad1a7ab4..36fa07ea52 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -53,7 +53,7 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo } else { bool* pRes = (bool*)p->pData; for (int32_t i = 0; i < pBlock->info.rows; ++i) { - if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *pRes) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i)) { + if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + i)) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i)) { continue; } @@ -113,8 +113,8 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM continue; } - for (int32_t m = startRowIdx; m < buildGrp->endIdx && rowNum < pBlock->info.rows; ++m, ++rowNum) { - if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *pRes) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx)) { + for (int32_t m = startRowIdx; m <= buildGrp->endIdx && rowNum < pBlock->info.rows; ++m, ++rowNum) { + if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + rowNum)) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx)) { continue; } @@ -367,46 +367,52 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB pCtx->hashJoin = false; - if (pJoin->build->rowBitmapSize > 0) { + if (!lastBuildGrp && pJoin->build->rowBitmapSize > 0) { mJoinAllocGrpRowBitmap(pJoin->build); } return (*pCtx->mergeCartFp)(pCtx); } -int32_t mJoinProcessNonEqualGrp(SMJoinMergeCtx* pCtx, SColumnInfoData* pCol, bool probeGrp, int64_t* probeTs, int64_t* buildTs) { - SMJoinGrpRows* pGrp; - SMJoinTableCtx* pTb; - int64_t* pTs; - - if (probeGrp) { - pGrp = &pCtx->probeNEqGrp; - pTb = pCtx->pJoin->probe; - pTs = probeTs; - } else { - pGrp = &pCtx->buildNEqGrp; - pTb = pCtx->pJoin->build; - pTs = buildTs; - } - - pGrp->blk = pTb->blk; - pGrp->beginIdx = pTb->blkRowIdx; - pGrp->readIdx = pGrp->beginIdx; - pGrp->endIdx = pGrp->beginIdx; +int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { + pCtx->probeNEqGrp.blk = pTb->blk; + pCtx->probeNEqGrp.beginIdx = pTb->blkRowIdx; + pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; + pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx; while (++pTb->blkRowIdx < pTb->blk->info.rows) { - MJOIN_GET_TB_CUR_TS(pCol, *pTs, pTb); + MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pTb); if (PROBE_TS_UNREACH(pCtx->ascTs, *probeTs, *buildTs)) { - pGrp->endIdx = pTb->blkRowIdx; + pCtx->probeNEqGrp.endIdx = pTb->blkRowIdx; continue; } break; } - return mJoinNonEqCart(pCtx, pGrp, probeGrp); + return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true); } +int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { + pCtx->buildNEqGrp.blk = pTb->blk; + pCtx->buildNEqGrp.beginIdx = pTb->blkRowIdx; + pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx; + pCtx->buildNEqGrp.endIdx = pCtx->buildNEqGrp.beginIdx; + + while (++pTb->blkRowIdx < pTb->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pTb); + if (PROBE_TS_OVER(pCtx->ascTs, *probeTs, *buildTs)) { + pCtx->buildNEqGrp.endIdx = pTb->blkRowIdx; + continue; + } + + break; + } + + return mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false); +} + + SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) { SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES); if (p) { @@ -548,13 +554,8 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin switch (pInfo->joinType) { case JOIN_TYPE_INNER: case JOIN_TYPE_FULL: - if (pInfo->tbs[0].inputStat.inputRowNum <= pInfo->tbs[1].inputStat.inputRowNum) { - buildIdx = 0; - probeIdx = 1; - } else { - buildIdx = 1; - probeIdx = 0; - } + buildIdx = 1; + probeIdx = 0; break; case JOIN_TYPE_LEFT: buildIdx = 1; @@ -957,8 +958,7 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { SSDataBlock* pBlock = NULL; while (true) { - //pBlock = (*pJoin->joinFps)(pOperator); - pBlock = mLeftJoinDo(pOperator); + pBlock = (*pJoin->joinFp)(pOperator); if (NULL == pBlock) { if (pJoin->errCode) { ASSERT(0); @@ -1062,6 +1062,24 @@ int32_t mJoinHandleConds(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJo return TSDB_CODE_SUCCESS; } +int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) { + switch (pJoin->joinType) { + case JOIN_TYPE_INNER: + pJoin->joinFp = mInnerJoinDo; + break; + case JOIN_TYPE_LEFT: + case JOIN_TYPE_RIGHT: + pJoin->joinFp = mLeftJoinDo; + break; + case JOIN_TYPE_FULL: + pJoin->joinFp = mFullJoinDo; + break; + default: + break; + } + + return TSDB_CODE_SUCCESS; +} SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { @@ -1087,6 +1105,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); + MJ_ERR_JRET(mJoinSetImplFp(pInfo)); MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode)); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 710df47752..aff129edc0 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -66,7 +66,7 @@ enum { }; #define COL_DISPLAY_WIDTH 18 -#define JT_MAX_LOOP 100000 +#define JT_MAX_LOOP 3000 #define LEFT_BLK_ID 0 #define RIGHT_BLK_ID 1 @@ -156,6 +156,9 @@ typedef struct { SOperatorInfo* pJoinOp; int32_t loopIdx; + + int32_t rightFinMatchNum; + bool* rightFinMatch; } SJoinTestCtx; typedef struct { @@ -174,7 +177,7 @@ typedef struct { SJoinTestCtx jtCtx = {0}; -SJoinTestCtrl jtCtrl = {0, 0, 0, 0}; +SJoinTestCtrl jtCtrl = {1, 1, 1, 0}; SJoinTestStat jtStat = {0}; SJoinTestResInfo jtRes = {0}; @@ -1144,6 +1147,136 @@ void leftJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { } + +void fullJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { + bool leftMatch = false, rightMatch = false, lfilterOut = false, rfilterOut = false; + void* lValue = NULL, *rValue = NULL, *filterValue = NULL; + int64_t lBig = 0, rBig = 0, fbig = 0; + int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows; + + memset(jtCtx.rightFinMatch, 0, rightGrpRows * sizeof(bool)); + + for (int32_t l = 0; l < leftGrpRows; ++l) { + char* lrow = jtCtx.colRowDataBuf + jtCtx.blkRowSize * l; + + lfilterOut = false; + leftMatch = false; + + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + lValue = lrow + jtCtx.colRowOffset[c]; + switch (jtInputColType[c]) { + case TSDB_DATA_TYPE_TIMESTAMP: + fbig = TIMESTAMP_FILTER_VALUE; + lBig = *(int64_t*)lValue; + break; + case TSDB_DATA_TYPE_INT: + fbig = INT_FILTER_VALUE; + lBig = *(int32_t*)lValue; + break; + case TSDB_DATA_TYPE_BIGINT: + fbig = BIGINT_FILTER_VALUE; + lBig = *(int64_t*)lValue; + break; + default: + break; + } + + if (jtCtx.leftFilterNum && jtCtx.leftFilterColList[c] && ((*(bool*)(lrow + c)) || lBig <= fbig)) { + lfilterOut = true; + } + } + + for (int32_t r = 0; r < rightGrpRows; ++r) { + char* rrow = jtCtx.colRowDataBuf + rightTbOffset + jtCtx.blkRowSize * r; + rightMatch = true; + rfilterOut = false; + + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + lValue = lrow + jtCtx.colRowOffset[c]; + + if (!*(bool*)(rrow + c)) { + rValue = rrow + jtCtx.colRowOffset[c]; + } + + switch (jtInputColType[c]) { + case TSDB_DATA_TYPE_TIMESTAMP: + fbig = TIMESTAMP_FILTER_VALUE; + lBig = *(int64_t*)lValue; + rBig = *(int64_t*)rValue; + break; + case TSDB_DATA_TYPE_INT: + fbig = INT_FILTER_VALUE; + lBig = *(int32_t*)lValue; + rBig = *(int32_t*)rValue; + break; + case TSDB_DATA_TYPE_BIGINT: + fbig = BIGINT_FILTER_VALUE; + lBig = *(int64_t*)lValue; + rBig = *(int64_t*)rValue; + break; + default: + break; + } + + if (jtCtx.colEqNum && jtCtx.colEqList[c] && ((*(bool*)(lrow + c)) || (*(bool*)(rrow + c)) || lBig != rBig)) { + rightMatch = false; + } + + if (jtCtx.colOnNum && jtCtx.colOnList[c] && ((*(bool*)(lrow + c)) || (*(bool*)(rrow + c)) || lBig <= rBig)) { + rightMatch = false; + } + + if (jtCtx.rightFilterNum && jtCtx.rightFilterColList[c] && ((*(bool*)(rrow + c)) || rBig <= fbig)) { + rfilterOut = true; + } + } + + if (rightMatch) { + jtCtx.rightFinMatch[r] = true; + } + + if (rfilterOut) { + if (!rightMatch) { + jtCtx.rightFinMatch[r] = true; + } + continue; + } + + if (!lfilterOut && rightMatch) { + putMatchRowToRes(lrow, rrow); + leftMatch= true; + } + } + + if (!lfilterOut && !leftMatch && 0 == jtCtx.rightFilterNum) { + putNMatchRowToRes(lrow, 0, MAX_SLOT_NUM); + } + } + + if (0 == jtCtx.leftFilterNum) { + for (int32_t r = 0; r < rightGrpRows; ++r) { + if (!jtCtx.rightFinMatch[r]) { + char* rrow = jtCtx.colRowDataBuf + rightTbOffset + jtCtx.blkRowSize * r; + putNMatchRowToRes(rrow, MAX_SLOT_NUM, 0); + } + } + } +} + + +void appendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { + switch (jtCtx.joinType) { + case JOIN_TYPE_LEFT: + leftJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); + break; + case JOIN_TYPE_FULL: + fullJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); + break; + default: + break; + } +} + void createTsEqGrpRows(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t leftGrpRows, int32_t rightGrpRows) { if (leftGrpRows <= 0 && rightGrpRows <= 0) { return; @@ -1170,7 +1303,7 @@ void createTsEqGrpRows(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t left makeAppendBlkData(ppLeft, ppRight, leftGrpRows, rightGrpRows); - leftJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); + appendEqGrpRes(leftGrpRows, rightGrpRows); } @@ -1240,6 +1373,12 @@ void createDummyBlkList(int32_t leftMaxRows, int32_t leftMaxGrpRows, int32_t rig jtCtx.rightMaxGrpRows = rightMaxGrpRows; jtCtx.blkRows = blkRows; + int32_t maxGrpRows = TMAX(leftMaxGrpRows, rightMaxGrpRows); + if (maxGrpRows > jtCtx.rightFinMatchNum) { + jtCtx.rightFinMatchNum = maxGrpRows; + jtCtx.rightFinMatch = (bool*)taosMemoryRealloc(jtCtx.rightFinMatch, maxGrpRows * sizeof(bool)); + } + createBothBlkRowsData(); } @@ -1638,7 +1777,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { bool contLoop = true; SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param->joinType, param->subType, param->cond, param->filter, param->asc); - createDummyBlkList(200, 200, 200, 200, 10); + createDummyBlkList(10, 10, 10, 10, 2); while (contLoop) { rerunBlockedHere(); @@ -1663,8 +1802,14 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { handleTestDone(); } +void handleCaseEnd() { + taosMemoryFree(jtCtx.rightFinMatch); + jtCtx.rightFinMatchNum = 0; +} + } // namespace +#if 0 #if 1 TEST(leftOuterJoin, noCondTest) { SJoinTestParam param; @@ -1764,6 +1909,108 @@ TEST(leftOuterJoin, fullCondTest) { taosMemoryFree(pTask); } #endif +#endif + +#if 0 +TEST(fullOuterJoin, noCondTest) { + SJoinTestParam param; + char* caseName = "fullOuterJoin:noCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_FULL; + param.subType = JOIN_STYPE_OUTER; + param.cond = TEST_NO_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + +#if 0 +TEST(fullOuterJoin, eqCondTest) { + SJoinTestParam param; + char* caseName = "fullOuterJoin:eqCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_FULL; + param.subType = JOIN_STYPE_OUTER; + param.cond = TEST_EQ_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); + handleCaseEnd(); +} +#endif + +#if 1 +TEST(fullOuterJoin, onCondTest) { + SJoinTestParam param; + char* caseName = "fullOuterJoin:onCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_FULL; + param.subType = JOIN_STYPE_OUTER; + param.cond = TEST_ON_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + +#if 1 +TEST(fullOuterJoin, fullCondTest) { + SJoinTestParam param; + char* caseName = "fullOuterJoin:fullCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_FULL; + param.subType = JOIN_STYPE_OUTER; + param.cond = TEST_FULL_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif int main(int argc, char** argv) { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 9df8acf683..cc3b7800d2 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1255,9 +1255,14 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_LOGIC_PLAN_JOIN: { SJoinLogicNode* pLogicNode = (SJoinLogicNode*)pNode; destroyLogicNode((SLogicNode*)pLogicNode); + nodesDestroyNode(pLogicNode->pWindowOffset); + nodesDestroyNode(pLogicNode->pJLimit); nodesDestroyNode(pLogicNode->pPrimKeyEqCond); - nodesDestroyNode(pLogicNode->pFullOnCond); nodesDestroyNode(pLogicNode->pColEqCond); + nodesDestroyNode(pLogicNode->pColOnCond); + nodesDestroyNode(pLogicNode->pTagEqCond); + nodesDestroyNode(pLogicNode->pTagOnCond); + nodesDestroyNode(pLogicNode->pFullOnCond); break; } case QUERY_NODE_LOGIC_PLAN_AGG: { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ec5190cb24..44d0fc2335 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1123,7 +1123,6 @@ static SNode* biMakeTbnameProjectAstNode(char* funcName, char* tableAlias) { n->literal = tstrdup(tableAlias); n->node.resType.type = TSDB_DATA_TYPE_BINARY; n->node.resType.bytes = strlen(n->literal); - n->isDuration = false; n->translate = false; valNode = n; } diff --git a/tests/script/tsim/join/inner_join.sim b/tests/script/tsim/join/inner_join.sim new file mode 100644 index 0000000000..44a4bb6851 --- /dev/null +++ b/tests/script/tsim/join/inner_join.sim @@ -0,0 +1,106 @@ +sql connect +sql use test0; + +sql select a.col1, b.col1 from sta a inner join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' order by a.col1, b.col1; +if $rows != 5 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data10 != 1 then + return -1 +endi +if $data11 != 2 then + return -1 +endi +if $data20 != 2 then + return -1 +endi +if $data21 != 1 then + return -1 +endi +if $data30 != 2 then + return -1 +endi +if $data31 != 2 then + return -1 +endi +if $data40 != 3 then + return -1 +endi +if $data41 != 3 then + return -1 +endi + +sql select a.col1, b.col1 from sta a join sta b on a.ts = b.ts where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1; +if $rows != 4 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data10 != 1 then + return -1 +endi +if $data11 != 2 then + return -1 +endi +if $data20 != 2 then + return -1 +endi +if $data21 != 1 then + return -1 +endi +if $data30 != 2 then + return -1 +endi +if $data31 != 2 then + return -1 +endi + +sql select a.col1, b.col1 from sta a join sta b on a.ts = b.ts; +if $rows != 12 then + return -1 +endi + +sql select a.col1, b.col1 from tba1 a join tba2 b on a.ts = b.ts order by a.col1, b.col1; +if $rows != 2 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data01 != 2 then + return -1 +endi +if $data10 != 4 then + return -1 +endi +if $data11 != 5 then + return -1 +endi + +sql select a.col1, b.col1 from tba2 a join tba1 b on a.ts = b.ts order by a.col1, b.col1; +if $rows != 2 then + return -1 +endi +if $data00 != 2 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data10 != 5 then + return -1 +endi +if $data11 != 4 then + return -1 +endi + diff --git a/tests/script/tsim/join/join.sim b/tests/script/tsim/join/join.sim index bb4b9dbee0..38a0229ca2 100644 --- a/tests/script/tsim/join/join.sim +++ b/tests/script/tsim/join/join.sim @@ -5,19 +5,19 @@ sql connect sql drop database if exists test0 sql create database test0 vgroups 3; sql use test0; -create stable sta (ts timestamp, col1 int) tags(t1 int); -create table tba1 using sta tags(1); -create table tba2 using sta tags(2); +sql create stable sta (ts timestamp, col1 int) tags(t1 int); +sql create table tba1 using sta tags(1); +sql create table tba2 using sta tags(2); -insert into tba1 values ('2023-11-17 16:29:00', 1); -insert into tba1 values ('2023-11-17 16:29:02', 3); -insert into tba1 values ('2023-11-17 16:29:03', 4); -insert into tba1 values ('2023-11-17 16:29:04', 5); - -insert into tba2 values ('2023-11-17 16:29:00', 2); -insert into tba2 values ('2023-11-17 16:29:01', 3); -insert into tba2 values ('2023-11-17 16:29:03', 5); -insert into tba2 values ('2023-11-17 16:29:05', 7); +sql insert into tba1 values ('2023-11-17 16:29:00', 1); +sql insert into tba1 values ('2023-11-17 16:29:02', 3); +sql insert into tba1 values ('2023-11-17 16:29:03', 4); +sql insert into tba1 values ('2023-11-17 16:29:04', 5); + +sql insert into tba2 values ('2023-11-17 16:29:00', 2); +sql insert into tba2 values ('2023-11-17 16:29:01', 3); +sql insert into tba2 values ('2023-11-17 16:29:03', 5); +sql insert into tba2 values ('2023-11-17 16:29:05', 7); sql drop database if exists testa sql create database testa vgroups 3; @@ -57,13 +57,17 @@ sql insert into ctb22 using st2 tags(2) values('2023-10-16 09:10:12', 110222, 11 sql insert into ctb23 using st2 tags(3) values('2023-10-16 09:10:13', 110223, 1102230); sql insert into ctb24 using st2 tags(4) values('2023-10-16 09:10:14', 110224, 1102240); +run tsim/join/inner_join.sim run tsim/join/left_join.sim +run tsim/join/right_join.sim print ================== restart server to commit data into disk system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s start print ================== server restart completed +run tsim/join/inner_join.sim run tsim/join/left_join.sim +run tsim/join/right_join.sim system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/join/right_join.sim b/tests/script/tsim/join/right_join.sim new file mode 100644 index 0000000000..d0fdbcbf1d --- /dev/null +++ b/tests/script/tsim/join/right_join.sim @@ -0,0 +1,160 @@ +sql connect +sql use test0; + +sql select a.col1, b.col1 from sta a right join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by b.col1, a.col1; +if $rows != 10 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data10 != 2 then + return -1 +endi +if $data11 != 1 then + return -1 +endi +if $data20 != 1 then + return -1 +endi +if $data21 != 2 then + return -1 +endi +if $data30 != 2 then + return -1 +endi +if $data31 != 2 then + return -1 +endi +if $data40 != NULL then + return -1 +endi +if $data41 != 3 then + return -1 +endi +if $data50 != NULL then + return -1 +endi +if $data51 != 3 then + return -1 +endi +if $data60 != NULL then + return -1 +endi +if $data61 != 4 then + return -1 +endi +if $data70 != NULL then + return -1 +endi +if $data71 != 5 then + return -1 +endi +if $data80 != NULL then + return -1 +endi +if $data81 != 5 then + return -1 +endi +if $data90 != NULL then + return -1 +endi +if $data91 != 7 then + return -1 +endi + +sql select a.col1, b.col1 from sta a right join sta b on a.ts = b.ts where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1; +if $rows != 4 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data10 != 1 then + return -1 +endi +if $data11 != 2 then + return -1 +endi +if $data20 != 2 then + return -1 +endi +if $data21 != 1 then + return -1 +endi +if $data30 != 2 then + return -1 +endi +if $data31 != 2 then + return -1 +endi + +sql select a.col1, b.col1 from sta a right join sta b on a.ts = b.ts; +if $rows != 12 then + return -1 +endi + +sql select a.col1, b.col1 from tba1 a right join tba2 b on a.ts = b.ts order by a.col1, b.col1; +if $rows != 4 then + return -1 +endi +if $data00 != NULL then + return -1 +endi +if $data01 != 3 then + return -1 +endi +if $data10 != NULL then + return -1 +endi +if $data11 != 7 then + return -1 +endi +if $data20 != 1 then + return -1 +endi +if $data21 != 2 then + return -1 +endi +if $data30 != 4 then + return -1 +endi +if $data31 != 5 then + return -1 +endi + +sql select a.col1, b.col1 from tba2 a right join tba1 b on a.ts = b.ts order by a.col1, b.col1; +if $rows != 4 then + return -1 +endi +if $data00 != NULL then + return -1 +endi +if $data01 != 3 then + return -1 +endi +if $data10 != NULL then + return -1 +endi +if $data11 != 5 then + return -1 +endi +if $data20 != 2 then + return -1 +endi +if $data21 != 1 then + return -1 +endi +if $data30 != 5 then + return -1 +endi +if $data31 != 4 then + return -1 +endi +