diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 918df31f00..3faf5c1b57 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -53,25 +53,9 @@ typedef struct SMJoinColInfo { char* bitMap; } SMJoinColInfo; -typedef struct SMJoinCartGrp { - bool sameTsGrp; - bool firstArrIdx; - bool secondArrIdx; - int32_t firstRowIdx; - int32_t firstRowNum; - int32_t secondRowIdx; - int32_t secondRowNum; -} SMJoinCartGrp; - -typedef struct SMJoinCartBlk { - SSDataBlock* pFirstBlk; - SSDataBlock* pSecondBlk; - SArray* pBlkGrps; -} SMJoinCartBlk; - - typedef struct SMJoinCartCtx { bool appendRes; + bool firstOnly; int32_t resThreshold; SSDataBlock* pResBlk; @@ -80,12 +64,17 @@ typedef struct SMJoinCartCtx { int32_t secondColNum; SMJoinColMap* pSecondCols; - SArray* pCartRowIdx; - SArray* pCartBlks; + SMJoinBlkInfo* pFirstBlk; + SMJoinBlkInfo* pSecondBlk; + int32_t firstRowIdx; + int32_t firstRowNum; + int32_t secondRowIdx; + int32_t secondRowNum; } SMJoinCartCtx; typedef struct SMJoinBlkInfo { bool cloned; + bool inUse; SSDataBlock* pBlk; void* pNext; } SMJoinBlkInfo; @@ -142,41 +131,44 @@ typedef struct SMJoinTsJoinCtx { } SMJoinTsJoinCtx; typedef struct SBuildGrpResIn { - bool multiBlkGrp; - SMJoinBlkInfo* grpBeginBlk; - int32_t grpRowBeginIdx; - int32_t grpRowNum; + bool multiBlk; + SMJoinBlkInfo* pBeginBlk; + int32_t rowBeginIdx; + int32_t rowNum; } SBuildGrpResIn; typedef struct SBuildGrpResOut { - SSHashObj* pGrpHash; - int32_t grpRowReadIdx; - int32_t grpRowGReadIdx; + SSHashObj* pHash; + SMJoinBlkInfo* pCurrBlk; + int32_t rowReadIdx; + int32_t rowGReadNum; } SBuildGrpResOut; typedef struct SProbeGrpResIn { bool allRowsGrp; - SMJoinBlkInfo* grpBeginBlk; - int32_t grpRowBeginIdx; - int32_t grpRowNum; + SMJoinBlkInfo* pBeginBlk; + int32_t rowBeginIdx; + int32_t rowNum; int64_t grpLastTs; } SProbeGrpResIn; typedef struct SProbeGrpResOut { - int32_t grpRowReadIdx; + SMJoinBlkInfo* pCurrBlk; + int32_t rowReadIdx; + int32_t rowGReadNum; } SProbeGrpResOut; typedef struct SGrpPairRes { bool sameTsGrp; bool finishGrp; bool hashJoin; - SProbeGrpResIn probeIn; - SBuildGrpResIn buildIn; + SProbeGrpResIn prbIn; + SBuildGrpResIn bldIn; /* KEEP THIS PART AT THE END */ bool outBegin; - SBuildGrpResOut buildOut; - SProbeGrpResOut probeOut; + SBuildGrpResOut bldOut; + SProbeGrpResOut prbOut; /* KEEP THIS PART AT THE END */ } SGrpPairRes; @@ -269,10 +261,12 @@ typedef struct SMJoinOperatorInfo { #define START_NEW_GRP(_ctx) memset(&(_ctx)->currGrpPair, 0, GRP_PAIR_INIT_SIZE) -#define REACH_HJOIN_THRESHOLD(_pair) ((_pair)->buildIn.grpRowNum * (_pair)->probeIn.grpRowNum > MJOIN_HJOIN_CART_THRESHOLD) +#define REACH_HJOIN_THRESHOLD(_pair) ((_pair)->buildIn.rowNum * (_pair)->probeIn.rowNum > MJOIN_HJOIN_CART_THRESHOLD) #define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair)) +#define BUILD_TB_BROKEN_BLK(_sg, _out, _in) ((_sg) && (((_out)->pCurrBlk == (_in)->pBeginBlk && (_out)->rowReadIdx != (_in)->rowBeginIdx) || ((_out)->pCurrBlk != (_in)->pBeginBlk && (_out)->rowReadIdx != 0))) + #define FIN_SAME_TS_GRP(_ctx, _octx, _done) do { \ if ((_ctx)->inSameTsGrp) { \ (_ctx)->currGrpPair.sameTsGrp = true; \ @@ -298,7 +292,8 @@ typedef struct SMJoinOperatorInfo { } \ } while (0) -#define CURRENT_BLK_GRP_ROWS(_in) (((_in)->grpRowNum + (_in)->grpRowBeginIdx) <= (_in)->grpBeginBlk->pBlk->info.rows ? (_in)->grpRowNum : ((_in)->grpBeginBlk->pBlk->info.rows - (_in)->grpRowBeginIdx)) +#define PRB_CUR_BLK_GRP_ROWS(_rn, _rb, _bn) (((_rn) + (_rb)) <= (_bn) ? (_rn) : ((_bn) - (_rb))) +#define BLD_CUR_BLK_GRP_ROWS(_sg, _rn, _rb, _bn) ((_sg) ? 1 : (((_rn) + (_rb)) <= (_bn) ? (_rn) : ((_bn) - (_rb)))) #ifdef __cplusplus diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 65ea024e61..56cb09b67f 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -320,15 +320,6 @@ static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* p pCtx->outputCtx.cartCtx.secondColNum = pBuildCtx->pTbInfo->finNum; pCtx->outputCtx.cartCtx.pSecondCols = pBuildCtx->pTbInfo->finCols; - pCtx->outputCtx.cartCtx.pCartRowIdx = taosArrayInit(MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM, sizeof(int32_t)); - if (NULL == pCtx->outputCtx.cartCtx.pCartRowIdx) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pCtx->outputCtx.cartCtx.pCartGrps = taosArrayInit(MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM, sizeof(SMJoinCartGrp)); - if (NULL == pCtx->outputCtx.cartCtx.pCartGrps) { - return TSDB_CODE_OUT_OF_MEMORY; - } - return TSDB_CODE_SUCCESS; } @@ -358,7 +349,10 @@ static int32_t mJoinAddBlkToList(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pCtx if (NULL == pNew) { return TSDB_CODE_OUT_OF_MEMORY; } + pNew->pBlk = pBlock; + pNew->inUse = true; + if (NULL == pCtx->pTailBlk) { pCtx->pTailBlk = pCtx->pHeadBlk = pNew; pCtx->pCurrBlk = pCtx->pHeadBlk; @@ -493,13 +487,13 @@ static void mJoinLeftJoinAddBlkToGrp(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* int32_t rowNum = pProbeCtx->pCurrBlk->pBlk->info.rows - pProbeCtx->blkRowIdx; if (pCtx->nextProbeRow && pCtx->inDiffTsGrp) { - pCtx->currGrpPair->probeIn.grpRowNum += rowNum; + pCtx->currGrpPair->prbIn.rowNum += rowNum; } else { pCtx->inDiffTsGrp = true; START_NEW_GRP(pCtx); - pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk; - pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx; - pCtx->currGrpPair->probeIn.grpRowNum = rowNum; + pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk; + pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx; + pCtx->currGrpPair->prbIn.rowNum = rowNum; } pCtx->nextProbeRow = true; @@ -531,6 +525,7 @@ static bool mJoinBuildMoveToNextBlk(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* contLoop = mJoinProbeMoveToNextBlk(pCtx, pProbeCtx); } else if (pCtx->probeTs[pProbeCtx->blkRowIdx] > *pCtx->buildEndTs) { contLoop = true; + pBuildCtx->pCurrBlk->inUse = false; break; } else { contLoop = false; @@ -563,8 +558,8 @@ static bool mLeftJoinSplitGrpImpl(SOperatorInfo* pOperator, SMJoinOperatorInfo* for (; pProbeCtx->blkRowIdx < pCtx->probeRowNum; ++pProbeCtx->blkRowIdx) { if (pCtx->pLastGrpPair && pCtx->pLastGrpPair->sameTsGrp - && pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->pLastGrpPair->probeIn.grpLastTs) { - pCtx->pLastGrpPair->probeIn.grpRowNum++; + && pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->pLastGrpPair->prbIn.grpLastTs) { + pCtx->pLastGrpPair->prbIn.rowNum++; SET_SAME_TS_GRP_HJOIN(pCtx->pLastGrpPair, pOutCtx); continue; } @@ -578,29 +573,29 @@ static bool mLeftJoinSplitGrpImpl(SOperatorInfo* pOperator, SMJoinOperatorInfo* } else if (pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->buildTs[pBuildCtx->blkRowIdx]) { FIN_DIFF_TS_GRP(pCtx, pOutCtx, false); if (pCtx->inSameTsGrp) { - pCtx->currGrpPair.buildIn.grpRowNum++; + pCtx->currGrpPair.bldIn.rowNum++; } else { pCtx->inSameTsGrp = true; START_NEW_GRP(pCtx); - pCtx->currGrpPair.buildIn.grpBeginBlk = pBuildCtx->pCurrBlk; - pCtx->currGrpPair.buildIn.grpRowBeginIdx = pBuildCtx->blkRowIdx; - pCtx->currGrpPair.buildIn.grpRowNum = 1; - pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk; - pCtx->currGrpPair.probeIn.grpLastTs = pCtx->probeTs[pProbeCtx->blkRowIdx]; - pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx; - pCtx->currGrpPair.probeIn.grpRowNum = 1; + pCtx->currGrpPair.bldIn.pBeginBlk = pBuildCtx->pCurrBlk; + pCtx->currGrpPair.bldIn.rowBeginIdx = pBuildCtx->blkRowIdx; + pCtx->currGrpPair.bldIn.rowNum = 1; + pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk; + pCtx->currGrpPair.prbIn.grpLastTs = pCtx->probeTs[pProbeCtx->blkRowIdx]; + pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx; + pCtx->currGrpPair.prbIn.rowNum = 1; } pCtx->nextProbeRow = false; } else { FIN_SAME_TS_GRP(pCtx, pOutCtx, true); if (pCtx->inDiffTsGrp) { - pCtx->currGrpPair.probeIn.grpRowNum++; + pCtx->currGrpPair.prbIn.rowNum++; } else { pCtx->inDiffTsGrp = true; START_NEW_GRP(pCtx); - pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk; - pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx; - pCtx->currGrpPair.probeIn.grpRowNum = 1; + pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk; + pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx; + pCtx->currGrpPair.prbIn.rowNum = 1; } pCtx->nextProbeRow = true; break; @@ -654,7 +649,8 @@ static bool mLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi if (0 == pJoin->ctx.mergeCtx.buildTbCtx.blkNum) { ASSERTS(pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone, "left join empty build table while fetch not done"); - + + FIN_SAME_TS_GRP(pCtx, &pJoin->ctx.mergeCtx.outputCtx, true); FIN_DIFF_TS_GRP(pCtx, &pJoin->ctx.mergeCtx.outputCtx, pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone); } else { mLeftJoinSplitGrpImpl(pOperator, pJoin); @@ -670,7 +666,7 @@ static void mLeftJoinCart(SMJoinCartCtx* pCtx) { for (int32_t c = 0; c < pCtx->firstColNum; ++c) { SMJoinColMap* pFirstCol = pCtx->pFirstCols + c; - SColumnInfoData* pInCol = taosArrayGet(pCtx->pFirstBlk->pDataBlock, pFirstCol->srcSlot); + SColumnInfoData* pInCol = taosArrayGet(pCtx->pFirstBlk->pBlk, pFirstCol->srcSlot); SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pFirstCol->dstSlot); for (int32_t r = 0; r < pCtx->firstRowNum; ++r) { if (colDataIsNull_s(pInCol, pCtx->firstRowIdx + r)) { @@ -681,33 +677,44 @@ static void mLeftJoinCart(SMJoinCartCtx* pCtx) { } } - for (int32_t c = 0; c < pCtx->secondColNum; ++c) { - SMJoinColMap* pSecondCol = pCtx->pSecondCols + c; - SColumnInfoData* pInCol = taosArrayGet(pCtx->pSecondBlk->pDataBlock, pSecondCol->srcSlot); - SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot); - for (int32_t r = 0; r < pCtx->firstRowNum; ++r) { - colDataAssignNRows(pOutCol, currRows + r * pCtx->secondRowNum, pInCol, pCtx->secondRowIdx, pCtx->secondRowNum); + if (pCtx->firstOnly) { + ASSERT(1 == pCtx->secondRowNum); + for (int32_t c = 0; c < pCtx->secondColNum; ++c) { + SMJoinColMap* pSecondCol = pCtx->pSecondCols + c; + SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot); + colDataSetNItemsNull(pOutCol, currRows, pCtx->firstRowNum); + } + } else { + for (int32_t c = 0; c < pCtx->secondColNum; ++c) { + SMJoinColMap* pSecondCol = pCtx->pSecondCols + c; + SColumnInfoData* pInCol = taosArrayGet(pCtx->pSecondBlk->pBlk, pSecondCol->srcSlot); + SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot); + for (int32_t r = 0; r < pCtx->firstRowNum; ++r) { + colDataAssignNRows(pOutCol, currRows + r * pCtx->secondRowNum, pInCol, pCtx->secondRowIdx, pCtx->secondRowNum); + } } } + + pCtx->pResBlk.info.rows += pCtx->firstRowNum * pCtx->secondRowNum; } static bool mLeftJoinSameTsOutput(SMJoinOperatorInfo* pJoin, SGrpPairRes* pPair) { - SProbeGrpResIn* pProbeIn = &pPair->probeIn; - SBuildGrpResIn* pBuildIn = &pPair->buildIn; + SProbeGrpResIn* pProbeIn = &pPair->prbIn; + SBuildGrpResIn* pBuildIn = &pPair->prbIn; if (!pPair->outBegin) { } - SMJoinBlkInfo* pBInfo = pProbeIn->grpBeginBlk; + SMJoinBlkInfo* pBInfo = pProbeIn->pBeginBlk; do { if (pJoin->prevFilter) { } else { - if (pPair->buildOut.hashJoin) { + if (pPair->bldOut.hashJoin) { } else { - for (; pProbeIn->grpRowBeginIdx < pBInfo->pBlk->info.rows && pProbeIn->grpRowNum > 0; pProbeIn->grpRowBeginIdx++, pProbeIn->grpRowNum--) { + for (; pProbeIn->rowBeginIdx < pBInfo->pBlk->info.rows && pProbeIn->rowNum > 0; pProbeIn->rowBeginIdx++, pProbeIn->rowNum--) { } } @@ -719,64 +726,107 @@ static bool mLeftJoinCartOutput(SMJoinOperatorInfo* pJoin, SMJoinOutputCtx* pCtx bool contLoop = false; SMJoinCartCtx* pCart = &pCtx->cartCtx; int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList); - int32_t rowsLeft = pCart->pResBlk pCart->pResBlk->info.rows; + int32_t rowsLeft = pCart->pResBlk->info.capacity - pCart->pResBlk->info.rows; + int32_t probeRows = 0; + int32_t buildRows = 0; + bool grpDone = false, brokenBlk = false; - for (; pCtx->grpReadIdx < grpNum; pCtx->grpReadIdx++) { - SGrpPairRes* pPair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx); - if (!pPair->finishGrp) { + for (; pCtx->grpReadIdx < grpNum && pCart->pResBlk->info.rows <= pCart->resThreshold; pCtx->grpReadIdx++) { + SGrpPairRes* pair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx); + if (!pair->finishGrp) { ASSERTS(pCtx->grpReadIdx == grpNum - 1, "unfinished grp not the last"); taosArrayRemoveBatch(pCtx->pGrpResList, 0, pCtx->grpReadIdx - 1, NULL); pCtx->grpReadIdx = 0; break; } - if (pPair->hashJoin) { - contLoop = mLeftJoinHashOutput(pJoin, pPair); - } else if (pCtx->cartCtx.appendRes) { - contLoop = mLeftJoinDirectOutput(pJoin, pPair); - } + grpDone = false; + pCart->firstOnly = !pair->sameTsGrp; + do { + if (!pair->outBegin) { + probeRows = PRB_CUR_BLK_GRP_ROWS(pair->prbIn.rowNum, pair->prbIn.rowBeginIdx, pair->prbIn.pBeginBlk->pBlk->info.rows); + buildRows = BLD_CUR_BLK_GRP_ROWS(pair->sameTsGrp, pair->bldIn.rowNum, pair->bldIn.rowBeginIdx, pair->bldIn.pBeginBlk->pBlk->info.rows); + pCart->pFirstBlk = pair->prbIn.pBeginBlk; + pCart->firstRowIdx = pair->prbIn.rowBeginIdx; + pair->prbOut.pCurrBlk = pair->prbIn.pBeginBlk; + if (pair->sameTsGrp) { + pCart->pSecondBlk = pair->bldIn.pBeginBlk; + pCart->secondRowIdx = pair->bldIn.rowBeginIdx; + pair->bldOut.pCurrBlk = pair->bldIn.pBeginBlk; + } + pair->outBegin = true; + brokenBlk = false; + } else if (BUILD_TB_BROKEN_BLK(pair->sameTsGrp, &pair->bldOut, &pair->bldIn)) { + probeRows = PRB_CUR_BLK_GRP_ROWS(1, pair->prbOut.rowReadIdx, pair->prbOut.pCurrBlk->pBlk.info.rows); + buildRows = BLD_CUR_BLK_GRP_ROWS(pair->bldIn.rowNum - pair->bldOut.rowGReadNum, pair->bldOut.rowReadIdx, pair->bldOut.pCurrBlk->pBlk.info.rows); + pCart->firstRowIdx = pair->prbOut.rowReadIdx; + pCart->secondRowIdx = pair->bldOut.rowReadIdx; + brokenBlk = true; + } else { + probeRows = PRB_CUR_BLK_GRP_ROWS(pair->prbIn.rowNum - pair->prbOut.rowGReadNum, pair->prbOut.rowReadIdx, pair->prbOut.pCurrBlk->pBlk.info.rows); + buildRows = BLD_CUR_BLK_GRP_ROWS(pair->bldIn.rowNum - pair->bldOut.rowGReadNum, pair->bldOut.rowReadIdx, pair->bldOut.pCurrBlk->pBlk.info.rows); + pCart->firstRowIdx = pair->prbOut.rowReadIdx; + if (pair->sameTsGrp) { + pCart->secondRowIdx = pair->bldOut.rowReadIdx; + } + brokenBlk = false; + } - if (!contLoop) { - return false; - } - } - -} + int64_t reqNum = probeRows * buildRows; + if (reqNum <= rowsLeft) { + pCart->firstRowNum = probeRows; + pCart->secondRowNum = buildRows; + + pair->prbOut.rowGReadNum += probeRows; + if (!brokenBlk) { + pair->prbOut.rowReadIdx = 0; -static bool mLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { - SMJoinOutputCtx* pCtx = &pJoin->ctx.mergeCtx.outputCtx; - bool contLoop = false; - int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList); + if (pair->sameTsGrp) { + pair->bldOut.rowGReadNum += buildRows; + pair->bldOut.rowReadIdx = 0; + } + } else { + pair->prbOut.rowReadIdx += probeRows; + } - if (pCtx->cartCtx.appendRes) { - return mLeftJoinCartOutput(pJoin, pCtx); - } - - for (; pCtx->grpReadIdx < grpNum; pCtx->grpReadIdx++) { - SGrpPairRes* pPair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx); - if (!pPair->finishGrp) { - ASSERTS(pCtx->grpReadIdx == grpNum - 1, "unfinished grp not the last"); - taosArrayRemoveBatch(pCtx->pGrpResList, 0, pCtx->grpReadIdx - 1, NULL); - pCtx->grpReadIdx = 0; + if (pair->prbOut.rowGReadNum >= pair->prbIn.rowNum) { + grpDone = true; + } + + rowsLeft -= reqNum; + } else if (buildRows <= rowsLeft) { + pCart->firstRowNum = brokenBlk ? 1 : (rowsLeft / buildRows); + + pair->prbOut.rowGReadNum += pCart->firstRowNum; + pair->prbOut.rowReadIdx = pCart->firstRowIdx + pCart->firstRowNum; + + pCart->secondRowNum = buildRows; + pair->bldOut.rowReadIdx = 0; + + rowsLeft -= (pCart->firstRowNum * pCart->secondRowNum); + } else { + ASSERT(pair->sameTsGrp); + + pCart->firstRowNum = 1; + pCart->secondRowNum = rowsLeft; + + pair->bldOut.rowReadIdx = pCart->secondRowIdx + rowsLeft; + + rowsLeft = 0; + } + + mLeftJoinCart(pCart); + }while ((!grpDone) && pCart->pResBlk->info.rows <= pCart->resThreshold); + + if (!grpDone) { break; } - - if (pPair->hashJoin) { - contLoop = mLeftJoinHashOutput(pJoin, pPair); - } else if (pCtx->cartCtx.appendRes) { - contLoop = mLeftJoinDirectOutput(pJoin, pPair); - } - - if (!contLoop) { - return false; - } } - - pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_RETRIEVE; - - return true; + } + + static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { bool contLoop = false;