diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index b8a83959f3..231ec6f5ea 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -869,6 +869,9 @@ bool inWinRange(STimeWindow* range, STimeWindow* cur); int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, TSKEY* primaryKeys, int32_t prevPosition, int32_t order); +void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status); + + #ifdef __cplusplus } diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 33844d1816..9597deaecd 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -59,6 +59,23 @@ typedef struct SMJoinColInfo { SColumnInfoData* colData; } SMJoinColInfo; +typedef struct SMJoinHashGrpRows { + int32_t rowBitmapOffset; + int32_t rowMatchNum; + bool allRowsMatch; + bool allRowsNMatch; + SArray* pRows; +} SMJoinHashGrpRows; + + +typedef struct SMJoinNMatchCtx { + void* pGrp; + int32_t iter; + int32_t bitIdx; + int32_t rowIdx; + int32_t grpIdx; +} SMJoinNMatchCtx; + typedef struct SMJoinTableCtx { EJoinTableType type; @@ -116,23 +133,11 @@ typedef struct SMJoinGrpRows { int32_t readIdx; int32_t rowBitmapOffset; int32_t rowMatchNum; + bool allRowsNMatch; bool allRowsMatch; bool readMatch; } SMJoinGrpRows; -typedef struct SMJoinHashGrpRows { - int32_t rowBitmapOffset; - int32_t rowMatchNum; - bool allRowsMatch; - SArray* pRows; -} SMJoinHashGrpRows; - -typedef struct SMJoinNMatchCtx { - void* pGrp; - int32_t iter; - int32_t bitIdx; - int32_t grpIdx; -} SMJoinNMatchCtx; typedef struct SMJoinMergeCtx { struct SMJoinOperatorInfo* pJoin; @@ -223,7 +228,7 @@ typedef struct SMJoinOperatorInfo { #define BLK_IS_FULL(_blk) ((_blk)->info.rows == (_blk)->info.capacity) #define MJOIN_ROW_BITMAP_SET(_b, _base, _idx) (!colDataIsNull_f((_b + _base), _idx)) -#define MJOIN_SET_ROW_BITMAP(_b, _base, _idx) (colDataClearNull_f((_b + _base), _idx)) +#define MJOIN_SET_ROW_BITMAP(_b, _base, _idx) colDataClearNull_f((_b + _base), _idx) #define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \ @@ -278,6 +283,10 @@ int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx); int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp); int32_t mJoinNonEqCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp); int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin); +int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx); +int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx); +int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset); +int32_t mJoinProcessNonEqualGrp(SMJoinMergeCtx* pCtx, SColumnInfoData* pCol, bool probeGrp, int64_t* probeTs, int64_t* buildTs); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 95d26fdd0e..f562ac4cf0 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -77,7 +77,6 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); -static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status); static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 10e8a31a58..a867c167b9 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -766,7 +766,7 @@ static int32_t mFullJoinMergeCart(SMJoinMergeCtx* pCtx) { const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 4, 32, 0, 4, 1, 2}; -static FORCE_INLINE int32_t mFullJoinOutputSingleRow(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, int32_t idx) { +static FORCE_INLINE int32_t mFullJoinOutputHashRow(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, int32_t idx) { SMJoinGrpRows grp = {0}; SMJoinRowPos* pPos = taosArrayGet(pGrpRows->pRows, idx); grp.blk = pPos->pBlk; @@ -775,60 +775,113 @@ static FORCE_INLINE int32_t mFullJoinOutputSingleRow(SMJoinMergeCtx* pCtx, SMJoi return mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, &grp, false); } -static int32_t mFullJoinOutputAllRows(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows) { +static int32_t mFullJoinOutputHashGrpRows(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch) { int32_t rowNum = taosArrayGetSize(pGrpRows->pRows); - for (int32_t i = 0; i < rowNum && !BLK_IS_FULL(pCtx->finBlk); ++i) { - MJ_ERR_RET(mFullJoinOutputSingleRow(pCtx, pGrpRows, i)); + for (; pNMatch->rowIdx < rowNum && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) { + MJ_ERR_RET(mFullJoinOutputHashRow(pCtx, pGrpRows, pNMatch->rowIdx)); } + + if (pNMatch->rowIdx >= rowNum) { + pNMatch->rowIdx = 0; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) { + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinNMatchCtx* pNMatch = &build->nMatchCtx; + if (NULL == pNMatch->pGrp) { + pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter); + pNMatch->bitIdx = 0; + } + + int32_t baseIdx = 0; + while (NULL != pNMatch->pGrp) { + SMJoinHashGrpRows* pGrpRows = (SMJoinHashGrpRows*)pNMatch->pGrp; + if (pGrpRows->allRowsMatch) { + pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter); + pNMatch->bitIdx = 0; + continue; + } + + if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) { + pGrpRows->allRowsNMatch = true; + + MJ_ERR_RET(mFullJoinOutputHashGrpRows(pCtx, pGrpRows, pNMatch)); + if (BLK_IS_FULL(pCtx->finBlk)) { + pCtx->nmatchRemains = true; + return TSDB_CODE_SUCCESS; + } + + pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter); + pNMatch->bitIdx = 0; + continue; + } + + int32_t bitBytes = BitmapLen(taosArrayGetSize(pGrpRows->pRows)); + for (; pNMatch->bitIdx < bitBytes; ++pNMatch->bitIdx) { + if (0 == build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]) { + continue; + } + + baseIdx = 8 * pNMatch->bitIdx; + char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]; + while (*v && !BLK_IS_FULL(pCtx->finBlk)) { + uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11]; + MJ_ERR_RET(mFullJoinOutputHashRow(pCtx, pGrpRows, baseIdx + n)); + MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n); + if (++pGrpRows->rowMatchNum == taosArrayGetSize(pGrpRows->pRows)) { + pGrpRows->allRowsMatch = true; + pNMatch->bitIdx = bitBytes; + break; + } + } + + if (BLK_IS_FULL(pCtx->finBlk)) { + pCtx->nmatchRemains = true; + return TSDB_CODE_SUCCESS; + } + } + + pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter); + pNMatch->bitIdx = 0; + } + + pCtx->nmatchRemains = false; + pCtx->lastEqGrp = false; + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t mFullJoinOutputMergeRow(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrpRows, int32_t idx) { + SMJoinGrpRows grp = {0}; + grp.blk = pGrpRows->blk; + grp.readIdx = idx; + grp.endIdx = idx; + return mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, &grp, false); +} + + +static int32_t mFullJoinOutputMergeGrpRows(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch) { + for (; pNMatch->rowIdx < pGrpRows->endIdx && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) { + MJ_ERR_RET(mFullJoinOutputMergeRow(pCtx, pGrpRows, pNMatch->rowIdx)); + } + + if (pNMatch->rowIdx >= pGrpRows->endIdx) { + pNMatch->rowIdx = 0; + } + return TSDB_CODE_SUCCESS; } -static int32_t mFullJoinHandleBuildGrpRemains(SMJoinMergeCtx* pCtx) { +static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinNMatchCtx* pNMatch = &build->nMatchCtx; - if (pCtx->hashJoin) { - while (NULL != (pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter))) { - SMJoinHashGrpRows* pGrpRows = (SMJoinHashGrpRows*)pNMatch->pGrp; - if (pGrpRows->allRowsMatch) { - continue; - } - if (pGrpRows->rowMatchNum <= 0) { - MJ_ERR_RET(mFullJoinOutputAllRows(pCtx, pGrpRows)); - continue; - } - - int32_t bitBytes = BitmapLen(taosArrayGetSize(pGrpRows->pRows)); - for (; pNMatch->bitIdx < bitBytes; ++pNMatch->bitIdx) { - if (0 == build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]) { - continue; - } - - char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]; - while (*v && !BLK_IS_FULL(pCtx->finBlk)) { - uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11]; - MJ_ERR_RET(mFullJoinOutputSingleRow(pCtx, pGrpRows, 8 * pNMatch->bitIdx + n)); - MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n); - if (++pGrpRows->rowMatchNum == taosArrayGetSize(pGrpRows->pRows)) { - pGrpRows->allRowsMatch = true; - pNMatch->bitIdx = bitBytes; - break; - } - } - - if (BLK_IS_FULL(pCtx->finBlk)) { - pCtx->nmatchRemains = true; - return TSDB_CODE_SUCCESS; - } - } - } - - pCtx->nmatchRemains = false; - pCtx->lastEqGrp = false; - return TSDB_CODE_SUCCESS; - } - + int32_t baseIdx = 0; + int32_t rowNum = 0; int32_t grpNum = taosArrayGetSize(build->eqGrps); for (; pNMatch->grpIdx < grpNum; ++pNMatch->grpIdx) { SMJoinGrpRows* pGrpRows = taosArrayGet(build->eqGrps, pNMatch->grpIdx); @@ -836,12 +889,22 @@ static int32_t mFullJoinHandleBuildGrpRemains(SMJoinMergeCtx* pCtx) { continue; } - if (pGrpRows->rowMatchNum <= 0) { - MJ_ERR_RET(mFullJoinOutputAllRows(pCtx, pGrpRows)); + if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) { + pGrpRows->allRowsNMatch = true; + + MJ_ERR_RET(mFullJoinOutputMergeGrpRows(pCtx, pGrpRows, pNMatch)); + + if (BLK_IS_FULL(pCtx->finBlk)) { + pCtx->nmatchRemains = true; + return TSDB_CODE_SUCCESS; + } + continue; } int32_t bitBytes = BitmapLen(pGrpRows->endIdx - pGrpRows->beginIdx + 1); + baseIdx = 8 * pNMatch->bitIdx; + rowNum = pGrpRows->endIdx - pGrpRows->beginIdx + 1; for (; pNMatch->bitIdx < bitBytes; ++pNMatch->bitIdx) { if (0 == build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]) { continue; @@ -850,12 +913,14 @@ static int32_t mFullJoinHandleBuildGrpRemains(SMJoinMergeCtx* pCtx) { char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]; while (*v && !BLK_IS_FULL(pCtx->finBlk)) { uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11]; - SMJoinGrpRows grp = {0}; - grp.blk = pGrpRows->blk; - grp.readIdx = 8 * pNMatch->bitIdx + n; - grp.endIdx = grp.readIdx; - MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, &grp, false)); - colDataSetNull_f(build->pRowBitmap + pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n); + MJ_ERR_RET(mFullJoinOutputMergeRow(pCtx, pGrpRows, baseIdx + n)); + + MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n); + if (++pGrpRows->rowMatchNum == rowNum) { + pGrpRows->allRowsMatch = true; + pNMatch->bitIdx = bitBytes; + break; + } } } @@ -867,9 +932,14 @@ static int32_t mFullJoinHandleBuildGrpRemains(SMJoinMergeCtx* pCtx) { pCtx->nmatchRemains = false; pCtx->lastEqGrp = false; + return TSDB_CODE_SUCCESS; } +static int32_t mFullJoinHandleBuildTableRemains(SMJoinMergeCtx* pCtx) { + return pCtx->hashJoin ? mFullJoinHandleHashGrpRemains(pCtx) : mFullJoinHandleMergeGrpRemains(pCtx); +} + SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; @@ -898,7 +968,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { } if (pCtx->nmatchRemains) { - MJ_ERR_JRET(mFullJoinHandleBuildGrpRemains(pCtx)); + MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -924,7 +994,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); } } else if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { - MJ_ERR_JRET(mFullJoinHandleBuildGrpRemains(pCtx)); + MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -945,7 +1015,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { } if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { - MJ_ERR_JRET(mFullJoinHandleBuildGrpRemains(pCtx)); + MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -1026,17 +1096,17 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ switch (pJoin->joinType) { case JOIN_TYPE_INNER: - pCtx->hashCartFp = mInnerJoinHashCart; - pCtx->mergeCartFp = mInnerJoinMergeCart; + pCtx->hashCartFp = (joinCartFp)mInnerJoinHashCart; + pCtx->mergeCartFp = (joinCartFp)mInnerJoinMergeCart; break; case JOIN_TYPE_LEFT: case JOIN_TYPE_RIGHT: - pCtx->hashCartFp = mLeftJoinHashCart; - pCtx->mergeCartFp = mLeftJoinMergeCart; + pCtx->hashCartFp = (joinCartFp)mLeftJoinHashCart; + pCtx->mergeCartFp = (joinCartFp)mLeftJoinMergeCart; break; case JOIN_TYPE_FULL: - pCtx->hashCartFp = mFullJoinHashCart; - pCtx->mergeCartFp = mFullJoinMergeCart; + pCtx->hashCartFp = (joinCartFp)mFullJoinHashCart; + pCtx->mergeCartFp = (joinCartFp)mFullJoinMergeCart; break; default: break; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 262104ca32..a9ad1a7ab4 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -48,7 +48,7 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo } if (!build->pHashGrpRows->allRowsMatch && (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED)) { - if (status == FILTER_RESULT_ALL_QUALIFIED && taosArrayGetSize(build->pHashCurGrp) == pBlock.info.rows) { + if (status == FILTER_RESULT_ALL_QUALIFIED && taosArrayGetSize(build->pHashCurGrp) == pBlock->info.rows) { build->pHashGrpRows->allRowsMatch = true; } else { bool* pRes = (bool*)p->pData; @@ -122,7 +122,7 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM buildGrp->rowMatchNum++; } - if (buildGrp->rowMatchNum == (buildGrp->endIdx - buildGrp->beginIdx + 1))) { + if (buildGrp->rowMatchNum == (buildGrp->endIdx - buildGrp->beginIdx + 1)) { buildGrp->allRowsMatch = true; } } @@ -323,7 +323,7 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, S } int32_t mJoinAllocGrpRowBitmap(SMJoinTableCtx* pTb) { - int32_t grpNum = taosArrayGetSize(pTb); + int32_t grpNum = taosArrayGetSize(pTb->eqGrps); for (int32_t i = 0; i < grpNum; ++i) { SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGet(pTb->eqGrps, i); MJ_ERR_RET(mJoinGetRowBitmapOffset(pTb, pGrp->endIdx - pGrp->beginIdx + 1, &pGrp->rowBitmapOffset)); @@ -404,7 +404,7 @@ int32_t mJoinProcessNonEqualGrp(SMJoinMergeCtx* pCtx, SColumnInfoData* pCol, break; } - return mJoinNonEqCart(pCtx, pGrp, ); + return mJoinNonEqCart(pCtx, pGrp, probeGrp); } SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) { @@ -633,7 +633,7 @@ static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) { taosArrayClear(pCreatedBlks); } -static int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset) { +int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset) { int32_t bitmapLen = BitmapLen(rowNum); int64_t reqSize = pTable->rowBitmapOffset + bitmapLen; if (reqSize > pTable->rowBitmapSize) { @@ -1059,6 +1059,7 @@ int32_t mJoinHandleConds(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJo break; } + return TSDB_CODE_SUCCESS; }