From 54ea94af38b82aa1e900e290106972dba6b893b7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 12 Dec 2023 19:31:12 +0800 Subject: [PATCH] enh: support left join --- source/libs/executor/inc/hashjoin.h | 17 +- source/libs/executor/inc/mergejoin.h | 63 +- source/libs/executor/src/hashjoinoperator.c | 20 +- source/libs/executor/src/mergejoinoperator.c | 643 +++++++++++++++---- 4 files changed, 597 insertions(+), 146 deletions(-) diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index 0409f559c3..07a78f9008 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -37,14 +37,15 @@ typedef struct SHJoinCtx { } SHJoinCtx; typedef struct SHJoinColInfo { - int32_t srcSlot; - int32_t dstSlot; - bool keyCol; - bool vardata; - int32_t* offset; - int32_t bytes; - char* data; - char* bitMap; + int32_t srcSlot; + int32_t dstSlot; + bool keyCol; + bool vardata; + int32_t* offset; + int32_t bytes; + char* data; + char* bitMap; + SColumnInfoData* colData; } SHJoinColInfo; typedef struct SBufPageInfo { diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 8796547bce..52dae0aadd 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -20,8 +20,8 @@ extern "C" { #endif #define MJOIN_DEFAULT_BLK_ROWS_NUM 4096 -#define MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM (MJOIN_DEFAULT_BLK_ROWS_NUM * 2) #define MJOIN_HJOIN_CART_THRESHOLD 16 +#define MJOIN_BLK_SIZE_LIMIT 10485760 typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*); @@ -32,12 +32,10 @@ typedef enum EJoinTableType { #define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE") -typedef enum EJoinPhase { - E_JOIN_PHASE_RETRIEVE, - E_JOIN_PHASE_SPLIT, - E_JOIN_PHASE_OUTPUT, - E_JOIN_PHASE_DONE -} EJoinPhase; +typedef struct SMJoinRowPos { + SSDataBlock* pBlk; + int32_t pos; +} SMJoinRowPos; typedef struct SMJoinColMap { int32_t srcSlot; @@ -53,10 +51,12 @@ typedef struct SMJoinColInfo { int32_t bytes; char* data; char* bitMap; + SColumnInfoData* colData; } SMJoinColInfo; typedef struct SMJoinTableInfo { + EJoinTableType type; int32_t downStreamIdx; SOperatorInfo* downStream; bool dsInitDone; @@ -70,9 +70,6 @@ typedef struct SMJoinTableInfo { int32_t finNum; SMJoinColMap* finCols; - - int32_t eqNum; - SMJoinColMap* eqCols; int32_t keyNum; SMJoinColInfo* keyCols; @@ -87,25 +84,45 @@ typedef struct SMJoinTableInfo { SArray* valVarCols; bool valColExist; - int32_t rowIdx; + SSDataBlock* blk; + int32_t blkRowIdx; + + int64_t grpRowsNum; + int64_t grpRemainRows; int32_t grpIdx; SArray* eqGrps; SArray* createdBlks; - SSDataBlock* blk; + + int32_t grpArrayIdx; + SArray* pGrpArrays; + + // hash join + int32_t grpRowIdx; + SArray* pHashCurGrp; + SSHashObj* pGrpHash; } SMJoinTableInfo; typedef struct SMJoinGrpRows { SSDataBlock* blk; int32_t beginIdx; - int32_t rowsNum; + int32_t endIdx; + int32_t readIdx; + bool readMatch; } SMJoinGrpRows; typedef struct SMJoinMergeCtx { bool hashCan; - bool rowRemains; + bool keepOrder; + bool grpRemains; + bool midRemains; bool eqCart; - int64_t curTs; - SMJoinGrpRows probeNEqGrps; + bool noColCond; + int32_t blksCapacity; + SSDataBlock* midBlk; + SSDataBlock* finBlk; + SSDataBlock* resBlk; + int64_t lastEqTs; + SMJoinGrpRows probeNEqGrp; bool hashJoin; } SMJoinMergeCtx; @@ -163,16 +180,24 @@ typedef struct SMJoinOperatorInfo { #define MJOIN_DS_NEED_INIT(_pOp, _tbctx) (MJOIN_DS_REQ_INIT(_pOp) && (!(_tbctx)->dsInitDone)) #define MJOIN_TB_LOW_BLK(_tbctx) ((_tbctx)->blkNum <= 0 || ((_tbctx)->blkNum == 1 && (_tbctx)->pHeadBlk->cloned)) -#define START_NEW_GRP(_ctx) memset(&(_ctx)->currGrpPair, 0, GRP_PAIR_INIT_SIZE) - -#define REACH_HJOIN_THRESHOLD(_pair) ((_pair)->buildIn.rowNum * (_pair)->probeIn.rowNum > MJOIN_HJOIN_CART_THRESHOLD) +#define REACH_HJOIN_THRESHOLD(_prb, _bld) ((_prb)->grpRowsNum * (_bld)->grpRowsNum > MJOIN_HJOIN_CART_THRESHOLD) #define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair)) #define LEFT_JOIN_NO_EQUAL(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts)) #define LEFT_JOIN_DISCRAD(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts)) +#define GRP_REMAIN_ROWS(_grp) ((_grp)->endIdx - (_grp)->readIdx + 1) +#define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx) +#define BLK_IS_FULL(_blk) ((_blk)->info.rows == (_blk)->info.capacity) + + +#define SET_TABLE_CUR_TS(_col, _ts, _tb) \ + do { \ + (_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \ + (_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \ + } while (0) #ifdef __cplusplus } diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index f8aa9323b7..1c0ef27ae2 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -402,11 +402,14 @@ static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, } -static FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { +static FORCE_INLINE bool copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { char *pData = NULL; size_t bufLen = 0; if (1 == pTable->keyNum) { + if (colDataIsNull_s(pTable->keyCols[0].colData, rowIdx)) { + return true; + } if (pTable->keyCols[0].vardata) { pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx]; bufLen = varDataTLen(pData); @@ -417,6 +420,9 @@ static FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t r pTable->keyData = pData; } else { for (int32_t i = 0; i < pTable->keyNum; ++i) { + if (colDataIsNull_s(pTable->keyCols[i].colData, rowIdx)) { + return true; + } if (pTable->keyCols[i].vardata) { pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx]; memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); @@ -433,6 +439,8 @@ static FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t r if (pBufLen) { *pBufLen = bufLen; } + + return false; } @@ -458,7 +466,10 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) { } for (; pCtx->probeIdx < pCtx->pProbeData->info.rows; ++pCtx->probeIdx) { - copyKeyColsDataToBuf(pProbe, pCtx->probeIdx, &bufLen); + if (copyKeyColsDataToBuf(pProbe, pCtx->probeIdx, &bufLen)) { + continue; + } + SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); /* size_t keySize = 0; @@ -501,6 +512,7 @@ static int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { if (pTable->keyCols[i].vardata) { pTable->keyCols[i].offset = pCol->varmeta.offset; } + pTable->keyCols[i].colData = pCol; } return TSDB_CODE_SUCCESS; @@ -681,7 +693,9 @@ static int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin size_t bufLen = 0; for (int32_t i = 0; i < pBlock->info.rows; ++i) { - copyKeyColsDataToBuf(pBuild, i, &bufLen); + if (copyKeyColsDataToBuf(pBuild, i, &bufLen)) { + continue; + } code = addRowToHash(pJoin, pBlock, bufLen, i); if (code) { return code; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 3efefb3e74..f96290dbb7 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -164,7 +164,7 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi SMJoinTableInfo* pTable = &pJoin->tbs[idx]; pTable->downStream = pDownstream[idx]; pTable->blkId = pDownstream[idx]->resultDataBlockId; - int32_t code = mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->LeftPrimSlotId : pJoinNode->rightPrimSlotId); + int32_t code = mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId); if (code) { return code; } @@ -217,6 +217,9 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin pInfo->build->downStreamIdx = buildIdx; pInfo->probe->downStreamIdx = probeIdx; + + pInfo->build->type = E_JOIN_TB_BUILD; + pInfo->probe->type = E_JOIN_TB_PROBE; } static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) { @@ -267,10 +270,8 @@ static int32_t mJoinInitBufPages(SMJoinOperatorInfo* pInfo) { static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; - pProbeCtx->type = E_JOIN_TB_PROBE; - pBuildCtx->type = E_JOIN_TB_BUILD; - - pCtx->hashCan = pJoin->probe->eqNum > 0; + pCtx->lastEqTs = INT64_MIN; + pCtx->hashCan = pJoin->probe->keyNum > 0; pCtx->probeEqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows)); pCtx->probeCreatedBlks = taosArrayInit(8, POINTER_BYTES); @@ -279,17 +280,15 @@ static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* p pCtx->buildCreatedBlks = taosArrayInit(8, POINTER_BYTES); if (pJoin->pFPreFilter) { - pCtx->outputCtx.cartCtx.pResBlk = createOneDataBlock(pJoin->pRes); - blockDataEnsureCapacity(pCtx->outputCtx.cartCtx.pResBlk, MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM); - pCtx->outputCtx.cartCtx.resThreshold = MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM * 0.75; - } else { - pCtx->outputCtx.cartCtx.pResBlk = pJoin->pRes; - pCtx->outputCtx.cartCtx.resThreshold = pOperator->resultInfo.threshold; + pCtx->midBlk = createOneDataBlock(pJoin->pResBlk, false); + blockDataEnsureCapacity(pCtx->midBlk, pJoin->pResBlk->info.rows); } - if (!pCtx->outputCtx.hashCan && NULL == pJoin->pFPreFilter) { - pCtx->outputCtx.cartCtx.appendRes = true; - } + pCtx->finBlk = pJoin->pResBlk; + + pCtx->blksCapacity = pJoin->pResBlk->info.rows * 2; + + pCtx->resBlk = NULL; return TSDB_CODE_SUCCESS; } @@ -365,43 +364,223 @@ static FORCE_INLINE void mJoinResetTsJoinCtx(SMJoinTsJoinCtx* pCtx) { } -static void mLeftJoinCart(SMJoinCartCtx* pCtx) { - int32_t currRows = pCtx->appendRes ? pCtx->pResBlk->info.rows : 0; +static void mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst) { + SMJoinTableInfo* probe = pJoin->probe; + SMJoinTableInfo* build = pJoin->build; + int32_t currRows = append ? pRes->info.rows : 0; + int32_t firstRows = GRP_REMAIN_ROWS(pFirst); - for (int32_t c = 0; c < pCtx->firstColNum; ++c) { - SMJoinColMap* pFirstCol = pCtx->pFirstCols + c; - SColumnInfoData* pInCol = taosArrayGet(pCtx->pFirstBlk->pBlk, pFirstCol->srcSlot); - SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pFirstCol->dstSlot); - for (int32_t r = 0; r < pCtx->firstRowNum; ++r) { - if (colDataIsNull_s(pInCol, pCtx->firstRowIdx + r)) { - colDataSetNItemsNull(pOutCol, currRows + r * pCtx->secondRowNum, pCtx->secondRowNum); - } else { - colDataSetNItems(pOutCol, currRows + r * pCtx->secondRowNum, colDataGetData(pInCol, pCtx->firstRowIdx + r), pCtx->secondRowNum, true); - } - } + for (int32_t c = 0; c < probe->finNum; ++c) { + SMJoinColMap* pFirstCol = probe->finCols + c; + SColumnInfoData* pInCol = taosArrayGet(pFirst->blk, pFirstCol->srcSlot); + SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); + colDataAssignNRows(pOutCol, currRows, pInCol, pFirst->readIdx, firstRows); } - - if (pCtx->firstOnly) { - ASSERT(1 == pCtx->secondRowNum); - for (int32_t c = 0; c < pCtx->secondColNum; ++c) { - SMJoinColMap* pSecondCol = pCtx->pSecondCols + c; - SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot); - colDataSetNItemsNull(pOutCol, currRows, pCtx->firstRowNum); - } - } else { - for (int32_t c = 0; c < pCtx->secondColNum; ++c) { - SMJoinColMap* pSecondCol = pCtx->pSecondCols + c; - SColumnInfoData* pInCol = taosArrayGet(pCtx->pSecondBlk->pBlk, pSecondCol->srcSlot); - SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot); - for (int32_t r = 0; r < pCtx->firstRowNum; ++r) { - colDataAssignNRows(pOutCol, currRows + r * pCtx->secondRowNum, pInCol, pCtx->secondRowIdx, pCtx->secondRowNum); - } - } + + for (int32_t c = 0; c < build->finNum; ++c) { + SMJoinColMap* pSecondCol = build->finCols + c; + SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot); + colDataSetNItemsNull(pOutCol, currRows, firstRows); } - - pCtx->pResBlk.info.rows += pCtx->firstRowNum * pCtx->secondRowNum; + + pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows; } +static void mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) { + SMJoinTableInfo* probe = pJoin->probe; + SMJoinTableInfo* build = pJoin->build; + int32_t currRows = append ? pRes->info.rows : 0; + int32_t firstRows = GRP_REMAIN_ROWS(pFirst); + int32_t secondRows = GRP_REMAIN_ROWS(pSecond); + + for (int32_t c = 0; c < probe->finNum; ++c) { + SMJoinColMap* pFirstCol = probe->finCols + c; + SColumnInfoData* pInCol = taosArrayGet(pFirst->blk, pFirstCol->srcSlot); + SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); + for (int32_t r = 0; r < firstRows; ++r) { + if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) { + colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows); + } else { + colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->beginIdx + r), secondRows, true); + } + } + } + + for (int32_t c = 0; c < build->finNum; ++c) { + SMJoinColMap* pSecondCol = build->finCols + c; + SColumnInfoData* pInCol = taosArrayGet(pSecond->blk, pSecondCol->srcSlot); + SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot); + for (int32_t r = 0; r < firstRows; ++r) { + colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows); + } + } + + pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows; +} + + +static void mLeftJoinMergeFullCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { + int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; + SMJoinTableInfo* probe = pJoin->probe; + SMJoinTableInfo* build = pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); + int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); + + pCtx->eqCart = true; + + if (probeRows * build->grpRemainRows <= rowsLeft) { + for (; build->grpIdx < buildGrpNum; ++build->grpIdx) { + SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + mLeftJoinGrpEqCart(pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + } + probe->grpIdx++; + build->grpRemainRows = 0; + pCtx->grpRemains = false; + return true; + } + + for (; probeGrp->readIdx <= probeGrp->endIdx; ++probeGrp->readIdx) { + for (; build->grpIdx < buildGrpNum; ++build->grpIdx) { + SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + int32_t probeEndIdx = probeGrp->endIdx; + probeGrp->endIdx = probeGrp->readIdx; + + if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { + mLeftJoinGrpEqCart(pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + rowsLeft -= GRP_REMAIN_ROWS(buildGrp); + probeGrp->endIdx = probeEndIdx; + continue; + } + + int32_t buildEndIdx = buildGrp->endIdx; + buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; + mLeftJoinGrpEqCart(pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + buildGrp->readIdx += rowsLeft; + buildGrp->endIdx = buildEndIdx; + rowsLeft = 0; + break; + } + + if (rowsLeft <= 0) { + break; + } + } + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; +} + +static void mLeftJoinCopyMergeMidBlk(SSDataBlock* pMid, SSDataBlock* pFin) { + SSDataBlock* pLess = NULL; + SSDataBlock* pMore = NULL; + if (pMid->info.rows < pFin->info.rows) { + pLess = pMid; + pMore = pFin; + } else { + pLess = pFin; + pMore = pMid; + } + + int32_t totalRows = pMid->info.rows + pFin->info.rows; + if (totalRows <= pMore->info.capacity) { + blockDataMerge(pMore, pLess); + } else { + + } +} + +static void mLeftJoinMergeSeqCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { + SMJoinTableInfo* probe = pJoin->probe; + SMJoinTableInfo* build = pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); + int32_t probeEndIdx = probeGrp->endIdx; + int32_t rowsLeft = pCtx->midBlk->info.capacity; + bool contLoop = true; + + pCtx->eqCart = true; + + do { + for (; !GRP_DONE(probeGrp->readIdx) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) { + probeGrp->endIdx = probeGrp->readIdx; + for (; build->grpIdx < buildGrpNum; ++build->grpIdx) { + SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + + if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { + mLeftJoinGrpEqCart(pJoin, pCtx->midBlk, true, probeGrp, buildGrp); + rowsLeft -= GRP_REMAIN_ROWS(buildGrp); + continue; + } + + int32_t buildEndIdx = buildGrp->endIdx; + buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; + mLeftJoinGrpEqCart(pJoin, pCtx->midBlk, true, probeGrp, buildGrp); + buildGrp->readIdx += rowsLeft; + buildGrp->endIdx = buildEndIdx; + rowsLeft = 0; + break; + } + + doFilter(pCtx->midBlk, pJoin->pFPreFilter, NULL); + if (pCtx->midBlk->info.rows > 0) { + probeGrp->readMatch = true; + } else if (build->grpIdx == buildGrpNum && !probeGrp->readMatch) { + mLeftJoinGrpNonEqCart(pJoin, pCtx->finBlk, true, probeGrp); + continue; + } + + if (pCtx->midBlk->info.rows >= pJoin->pOperator->resultInfo.threshold) { + contLoop = false; + break; + } + + rowsLeft = pCtx->midBlk->info.capacity - pCtx->midBlk->info.rows; + + mLeftJoinCopyMergeMidBlk(&pCtx->midBlk, &pCtx->finBlk); + break; + } + + if (GRP_DONE(probeGrp->readIdx) || BLK_IS_FULL(pCtx->finBlk)) { + break; + } + } while (contLoop); + + probeGrp->endIdx = probeEndIdx; + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; +} + +static void mLeftJoinMergeCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { + if (NULL == pJoin->pFPreFilter) { + mLeftJoinMergeFullCart(pJoin, pCtx); + } else { + mLeftJoinMergeSeqCart(pJoin, pCtx); + } +} + +static void mLeftJoinNonEqCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { + int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; + SMJoinGrpRows* probeGrp = &pCtx->probeNEqGrp; + int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); + + pCtx->eqCart = false; + + if (probeRows <= rowsLeft) { + mLeftJoinGrpNonEqCart(pJoin, pCtx->finBlk, true, probeGrp, NULL); + probeGrp->readIdx = probeGrp->endIdx + 1; + pCtx->grpRemains = false; + } else { + int32_t probeEndIdx = probeGrp->endIdx; + probeGrp->endIdx = probeGrp->readIdx + rowsLeft - 1; + mLeftJoinGrpNonEqCart(pJoin, pCtx->finBlk, true, probeGrp, NULL); + probeGrp->readIdx = probeGrp->endIdx + 1; + probeGrp->endIdx = probeEndIdx; + pCtx->grpRemains = true; + } +} + + + static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableInfo* ppTb) { if (!(*ppTb)->dsFetchDone && (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows)) { (*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, (*ppTb)->downStreamIdx); @@ -421,12 +600,12 @@ static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBl static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { - bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->rowIdx, &pJoin->probe->blk, &pJoin->probe); + bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, &pJoin->probe); bool buildGot = false; do { if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { - buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->rowIdx, &pJoin->build->blk, &pJoin->build); + buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, &pJoin->build); } if (NULL == pJoin->probe->blk) { @@ -435,7 +614,7 @@ static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi } else if (buildGot && probeGot) { SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); - if (*((int64_t*)pProbeCol->pData + pJoin->probe->rowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { + if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { continue; } } @@ -494,16 +673,21 @@ static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable pGrp = taosArrayReserve(pTable->eqGrps, 1); } - pGrp->beginIdx = pTable->rowIdx++; - pGrp->rowsNum = 1; + pGrp->beginIdx = pTable->blkRowIdx++; + pGrp->readIdx = pGrp->beginIdx; + pGrp->endIdx = pGrp->beginIdx; + pGrp->readMatch = false; pGrp->blk = pTable->blk; - for (; pTable->rowIdx < pTable->blk->info.rows; ++pTable->rowIdx) { - char* pNextVal = colDataGetData(pCol, pTable->rowIdx); + for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { + char* pNextVal = colDataGetData(pCol, pTable->blkRowIdx); if (timestamp == *(int64_t*)pNextVal) { - pGrp->rowsNum++; + pGrp->endIdx++; continue; } + + pTable->grpRowsNum += pGrp->endIdx - pGrp->beginIdx + 1; + pTable->grpRemainRows = pTable->grpRowsNum; return; } @@ -512,18 +696,19 @@ static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable if (0 == pGrp->beginIdx) { pGrp->blk = createOneDataBlock(pTable->blk, true); } else { - pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, pGrp->rowsNum - pGrp->beginIdx); + pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, pGrp->endIdx - pGrp->beginIdx + 1); } taosArrayPush(pTable->createdBlks, &pGrp->blk); pGrp->beginIdx = 0; } + + pTable->grpRowsNum += pGrp->endIdx - pGrp->beginIdx + 1; + pTable->grpRemainRows = pTable->grpRowsNum; } static int32_t mJoinRetrieveSameTsRows(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp) { SMJoinOperatorInfo* pJoin = pOperator->info; - int32_t endPos = -1; - SSDataBlock* dataBlock = startDataBlock; bool allBlk = false; mJoinBuildEqGroups(pOperator, pTable, timestamp, &allBlk, true); @@ -532,7 +717,7 @@ static int32_t mJoinRetrieveSameTsRows(SOperatorInfo* pOperator, SMJoinTableInfo pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx); qDebug("merge join %s table got block for same ts, rows:%" PRId64, MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); - pTable->rowIdx = 0; + pTable->blkRowIdx = 0; if (NULL == pTable->blk) { pTable->dsFetchDone = true; @@ -546,96 +731,323 @@ static int32_t mJoinRetrieveSameTsRows(SOperatorInfo* pOperator, SMJoinTableInfo return 0; } -static int32_t mJoinEqualCart(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes) { - int32_t code = TSDB_CODE_SUCCESS; - SMJoinOperatorInfo* pJoin = pOperator->info; - SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; - SSHashObj* rightTableHash = NULL; - bool rightUseBuildTable = false; - - if (!pCtx->rowRemains) { - mJoinBuildEqGroups(pOperator, pJoin->probe, timestamp, NULL, true); - mJoinRetrieveSameTsRows(pOperator, pJoin->build, timestamp); - if (pJoinInfo->pColEqualOnConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { - mergeJoinFillBuildTable(pJoinInfo, rightRowLocations); - pCtx->hashJoin = true; - taosArrayDestroy(rightRowLocations); - rightRowLocations = NULL; +static int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableInfo* pTable) { + for (int32_t i = 0; i < pTable->keyNum; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); + if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { + qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata); + return TSDB_CODE_INVALID_PARA; } + if (pTable->keyCols[i].bytes != pCol->info.bytes) { + qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->keyCols[i].srcSlot, pCol->info.bytes, pTable->keyCols[i].bytes); + return TSDB_CODE_INVALID_PARA; + } + pTable->keyCols[i].data = pCol->pData; + if (pTable->keyCols[i].vardata) { + pTable->keyCols[i].offset = pCol->varmeta.offset; + } + pTable->keyCols[i].colData = pCol; } - bool reachThreshold = false; - - if (code == TSDB_CODE_SUCCESS) { - mLeftJoinCart(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, - rightRowIdx, pCtx->hashJoin, rightRowLocations, &reachThreshold); - } - - if (!reachThreshold) { - mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, - pCtx->hashJoin, rightRowLocations); + return TSDB_CODE_SUCCESS; +} +static FORCE_INLINE true mJoinCopyKeyColsDataToBuf(SMJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { + char *pData = NULL; + size_t bufLen = 0; + + if (1 == pTable->keyNum) { + if (colDataIsNull_s(pTable->keyCols[0].colData, rowIdx)) { + return true; + } + if (pTable->keyCols[0].vardata) { + pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx]; + bufLen = varDataTLen(pData); + } else { + pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx; + bufLen = pTable->keyCols[0].bytes; + } + pTable->keyData = pData; } else { - pJoinInfo->rowCtx.rowRemains = true; - pJoinInfo->rowCtx.ts = timestamp; - pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; - pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; - pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; - pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; + for (int32_t i = 0; i < pTable->keyNum; ++i) { + if (colDataIsNull_s(pTable->keyCols[i].colData, rowIdx)) { + return true; + } + if (pTable->keyCols[i].vardata) { + pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx]; + memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); + bufLen += varDataTLen(pData); + } else { + pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx; + memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes); + bufLen += pTable->keyCols[i].bytes; + } + } + pTable->keyData = pTable->keyBuf; } + + if (pBufLen) { + *pBufLen = bufLen; + } + + return false; +} + +static int32_t mJoinGetAvailableGrpArray(SMJoinTableInfo* pTable, SArray** ppRes) { + do { + if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) { + *ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++); + return TSDB_CODE_SUCCESS; + } + + SArray* pNew = taosArrayInit(4, sizeof(SMJoinRowPos)); + if (NULL == pNew) { + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArrayPush(pTable->pGrpArrays, &pNew); + } while (true); + + return TSDB_CODE_SUCCESS; +} + +static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) { + SMJoinTableInfo* pBuild = pJoin->build; + SMJoinRowPos pos = {pBlock, rowIdx}; + SArray** pGrpRows = tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen); + if (!pGrpRows) { + SArray* pNewGrp = NULL; + int32_t code = mJoinGetAvailableGrpArray(pBuild, &pNewGrp); + if (code) { + return code; + } + taosArrayPush(pNewGrp, &pos); + tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES); + } else { + taosArrayPush(*pGrpRows, &pos); + } + return TSDB_CODE_SUCCESS; } +static void mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableInfo* pTable) { + int32_t grpNum = taosArrayGetSize(pTable->eqGrps); + for (int32_t g = 0; g < grpNum; ++g) { + SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g); + int32_t code = mJoinSetKeyColsData(pGrp->blk, pTable); + if (code) { + return code; + } + + int32_t grpRows = GRP_REMAIN_ROWS(pGrp); + size_t bufLen = 0; + for (int32_t r = 0; r < grpRows; ++r) { + if (mJoinCopyKeyColsDataToBuf(pTable, r, &bufLen)) { + continue; + } + code = mJoinAddRowToHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r); + if (code) { + return code; + } + } + } +} + +static bool mLeftJoinHashRowCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp, SMJoinTableInfo* probe, SMJoinTableInfo* build) { + int32_t rowsLeft = pCtx->resBlk->info.capacity - pCtx->resBlk->info.rows; + int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp); + int32_t grpRows = buildGrpRows - build->grpRowIdx; + int32_t actRows = TMIN(grpRows, rowsLeft); + int32_t currRows = pCtx->noColCond ? pCtx->resBlk->info.rows : 0; + + for (int32_t c = 0; c < probe->finNum; ++c) { + SMJoinColMap* pFirstCol = probe->finCols + c; + SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk, pFirstCol->srcSlot); + SColumnInfoData* pOutCol = taosArrayGet(pCtx->resBlk->pDataBlock, pFirstCol->dstSlot); + if (colDataIsNull_s(pInCol, probeGrp->readIdx)) { + colDataSetNItemsNull(pOutCol, currRows, actRows); + } else { + colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->beginIdx), actRows, true); + } + } + + for (int32_t c = 0; c < build->finNum; ++c) { + SMJoinColMap* pSecondCol = build->finCols + c; + SColumnInfoData* pOutCol = taosArrayGet(pCtx->resBlk->pDataBlock, pSecondCol->dstSlot); + for (int32_t r = 0; r < actRows; ++r) { + SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, r); + SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk, pSecondCol->srcSlot); + colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1); + } + } + + pCtx->resBlk->info.rows += actRows; + if (actRows == grpRows) { + build->grpRowIdx = -1; + } else { + build->grpRowIdx += actRows; + } + + if (actRows == rowsLeft) { + return false; + } + + return true; +} + +static void mLeftJoinHashCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { + SMJoinTableInfo* probe = pJoin->probe; + SMJoinTableInfo* build = pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + + if (pJoin->build->grpRowIdx >= 0) { + bool contLoop = mLeftJoinHashRowCart(pCtx, probeGrp, probe, build); + if (build->grpRowIdx < 0) { + probeGrp->readIdx++; + } + + if (!contLoop) { + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + return; + } + } + + pCtx->eqCart = true; + + size_t bufLen = 0; + for (; probeGrp->readIdx < probeGrp->endIdx; ++probeGrp->readIdx) { + if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { + continue; + } + + SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); + if (NULL == pGrp) { + + } else { + build->pHashCurGrp = *pGrp; + build->grpRowIdx = 0; + if (NULL == pJoin->pPreFilter) { + if (!mLeftJoinHashRowCart(pCtx, probeGrp, probe, build)) { + break; + } + } else { + + } + } + } + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; +} + +static int32_t mLeftJoinProcessEqualGrp(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes) { + SMJoinOperatorInfo* pJoin = pOperator->info; + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + + mJoinBuildEqGroups(pOperator, pJoin->probe, timestamp, NULL, true); + mJoinRetrieveSameTsRows(pOperator, pJoin->build, timestamp); + if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) { + int32_t code = mJoinMakeBuildTbHash(pJoin, pJoin->build); + if (code) { + return code; + } + code = mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe); + if (code) { + return code; + } + + pCtx->hashJoin = true; + mLeftJoinHashCart(pJoin, pCtx); + } else { + mLeftJoinMergeCart(pJoin, pCtx); + } + + return TSDB_CODE_SUCCESS; +} + + +static bool mLeftJoinHandleRowRemains(struct SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx, SSDataBlock* pRes) { + if (pCtx->eqCart) { + if (pCtx->hashJoin) { + mLeftJoinHashCart(pJoin, pCtx); + } else { + mLeftJoinMergeCart(pJoin, pCtx); + } + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + return false; + } + + return true; + } + + mLeftJoinNonEqCart(pJoin, pCtx); + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + return false; + } + + return true; +} static void mLeftJoinDo(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; - int64_t probeTs = INT64_MIN; + int64_t probeTs = 0; int64_t buildTs = 0; SColumnInfoData* pBuildCol = NULL; SColumnInfoData* pProbeCol = NULL; + bool asc = (pJoin->inputTsOrder == TSDB_ORDER_ASC) ? true : false; - bool asc = (pJoin->inputOrder == TSDB_ORDER_ASC) ? true : false; + if (pCtx->grpRemains && !mLeftJoinHandleRowRemains(pOperator, pJoin, pCtx, pRes)) { + return; + } do { - if (pCtx->rowRemains) { - probeTs = buildTs = pCtx->curTs; - } else { - if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx, probeTs)) { - break; - } - - pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); - pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); - probeTs = *((int64_t*)pProbeCol->pData + pJoin->probe->rowIdx); - buildTs = *((int64_t*)pBuildCol->pData + pCtx->buildIdx); + if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) { + break; } - while (pCtx->probeIdx < pJoin->probe->blk->info.rows && pCtx->buildIdx < pJoin->build->blk->info.rows) { + SET_TABLE_CUR_TS(pBuildCol, buildTs, pJoin->build); + SET_TABLE_CUR_TS(pProbeCol, probeTs, pJoin->probe); + + if (probeTs == pCtx->lastEqTs) { + mLeftJoinProcessEqualGrp(pOperator, probeTs, pRes); + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + return; + } + + if (pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { + SET_TABLE_CUR_TS(pProbeCol, probeTs, pJoin->probe); + } else { + continue; + } + } + + while (pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows && pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { if (probeTs == buildTs) { - mJoinEqualCart(pOperator, probeTs, pRes); + pCtx->lastEqTs = probeTs; + mLeftJoinProcessEqualGrp(pOperator, probeTs, pRes); if (pRes->info.rows >= pOperator->resultInfo.threshold) { return; } - break; + + SET_TABLE_CUR_TS(pBuildCol, buildTs, pJoin->build); + SET_TABLE_CUR_TS(pProbeCol, probeTs, pJoin->probe); } else if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) { - pCtx->probeNEqGrps.beginIdx = pCtx->probeIdx; + pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx; do { - pCtx->probeNEqGrps.rowsNum++; - probeTs = *((int64_t*)pProbeCol->pData + (++pCtx->probeIdx)); - } while (pCtx->probeIdx < pJoin->probe->blk->info.rows && LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)); + pCtx->probeNEqGrp.endIdx = pJoin->probe->blkRowIdx; + probeTs = *((int64_t*)pProbeCol->pData + (++pJoin->probe->blkRowIdx)); + } while (pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows && LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)); - mJoinNonEqualCart(pOperator, &pCtx->probeNEqGrps, pRes); + mLeftJoinNonEqCart(pJoin, pCtx); if (pRes->info.rows >= pOperator->resultInfo.threshold) { return; } } else { - buildTs = *((int64_t*)pBuildCol->pData + (++pCtx->buildIdx)); - while (pCtx->buildIdx < pJoin->build->blk->info.rows && LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) { - buildTs = *((int64_t*)pBuildCol->pData + (++pCtx->buildIdx)); + buildTs = *((int64_t*)pBuildCol->pData + (++pJoin->build->blkRowIdx)); + while (pJoin->build->blkRowIdx < pJoin->build->blk->info.rows && LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) { + buildTs = *((int64_t*)pBuildCol->pData + (++pJoin->build->blkRowIdx)); } } } @@ -711,9 +1123,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t } int32_t numOfCols = 0; - pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - initResultSizeInfo(&pOperator->resultInfo, MJOIN_DEFAULT_BLK_ROWS_NUM); - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + pInfo->pResBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + blockDataEnsureCapacity(pInfo->pResBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc.totalRowSize)); setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);