diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 3faf5c1b57..8796547bce 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -30,6 +30,8 @@ typedef enum EJoinTableType { E_JOIN_TB_PROBE } EJoinTableType; +#define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE") + typedef enum EJoinPhase { E_JOIN_PHASE_RETRIEVE, E_JOIN_PHASE_SPLIT, @@ -53,41 +55,13 @@ typedef struct SMJoinColInfo { char* bitMap; } SMJoinColInfo; -typedef struct SMJoinCartCtx { - bool appendRes; - bool firstOnly; - int32_t resThreshold; - SSDataBlock* pResBlk; - - int32_t firstColNum; - SMJoinColMap* pFirstCols; - int32_t secondColNum; - SMJoinColMap* pSecondCols; - - SMJoinBlkInfo* pFirstBlk; - SMJoinBlkInfo* pSecondBlk; - int32_t firstRowIdx; - int32_t firstRowNum; - int32_t secondRowIdx; - int32_t secondRowNum; -} SMJoinCartCtx; - -typedef struct SMJoinBlkInfo { - bool cloned; - bool inUse; - SSDataBlock* pBlk; - void* pNext; -} SMJoinBlkInfo; - -typedef struct SMJoinRowInfo { - int64_t blkId; - int32_t rowIdx; - int64_t rowGIdx; -} SMJoinRowInfo; typedef struct SMJoinTableInfo { int32_t downStreamIdx; SOperatorInfo* downStream; + bool dsInitDone; + bool dsFetchDone; + int32_t blkId; SQueryStat inputStat; @@ -112,97 +86,27 @@ typedef struct SMJoinTableInfo { int32_t valBufSize; SArray* valVarCols; bool valColExist; + + int32_t rowIdx; + int32_t grpIdx; + SArray* eqGrps; + SArray* createdBlks; + SSDataBlock* blk; } SMJoinTableInfo; -typedef struct SMJoinTsJoinCtx { - SMJoinTableCtx* pProbeCtx; - SMJoinTableCtx* pBuildCtx; - int64_t probeRowNum; - int64_t buildRowNum; - int64_t* probeTs; - int64_t* buildTs; - int64_t* probeEndTs; - int64_t* buildEndTs; - bool inSameTsGrp; - bool inDiffTsGrp; - bool nextProbeRow; - SGrpPairRes* pLastGrpPair; - SGrpPairRes currGrpPair; -} SMJoinTsJoinCtx; - -typedef struct SBuildGrpResIn { - bool multiBlk; - SMJoinBlkInfo* pBeginBlk; - int32_t rowBeginIdx; - int32_t rowNum; -} SBuildGrpResIn; - -typedef struct SBuildGrpResOut { - SSHashObj* pHash; - SMJoinBlkInfo* pCurrBlk; - int32_t rowReadIdx; - int32_t rowGReadNum; -} SBuildGrpResOut; - -typedef struct SProbeGrpResIn { - bool allRowsGrp; - SMJoinBlkInfo* pBeginBlk; - int32_t rowBeginIdx; - int32_t rowNum; - int64_t grpLastTs; -} SProbeGrpResIn; - -typedef struct SProbeGrpResOut { - SMJoinBlkInfo* pCurrBlk; - int32_t rowReadIdx; - int32_t rowGReadNum; -} SProbeGrpResOut; - -typedef struct SGrpPairRes { - bool sameTsGrp; - bool finishGrp; - bool hashJoin; - SProbeGrpResIn prbIn; - SBuildGrpResIn bldIn; - - /* KEEP THIS PART AT THE END */ - bool outBegin; - SBuildGrpResOut bldOut; - SProbeGrpResOut prbOut; - /* KEEP THIS PART AT THE END */ -} SGrpPairRes; - -#define GRP_PAIR_INIT_SIZE (sizeof(SGrpPairRes) - sizeof(bool) - sizeof(SBuildGrpResOut) - sizeof(SProbeGrpResOut)) - -typedef struct SMJoinOutputCtx { - bool hashCan; - int32_t grpReadIdx; - int64_t grpCurTs; - SMJoinCartCtx cartCtx; - SArray* pGrpResList; -} SMJoinOutputCtx; - -typedef struct SMJoinTableCtx { - EJoinTableType type; - void* blkFetchedFp; - SMJoinTableInfo* pTbInfo; - bool dsInitDone; - bool dsFetchDone; - int64_t blkCurTs; - int32_t blkRowIdx; - int64_t blkIdx; - int64_t blkNum; - SMJoinBlkInfo* pCurrBlk; - SMJoinBlkInfo* pHeadBlk; - SMJoinBlkInfo* pTailBlk; -} SMJoinTableCtx; +typedef struct SMJoinGrpRows { + SSDataBlock* blk; + int32_t beginIdx; + int32_t rowsNum; +} SMJoinGrpRows; typedef struct SMJoinMergeCtx { - EJoinPhase joinPhase; - SMJoinOutputCtx outputCtx; - SMJoinTsJoinCtx tsJoinCtx; - SMJoinTableCtx buildTbCtx; - SMJoinTableCtx probeTbCtx; + bool hashCan; + bool rowRemains; + bool eqCart; + int64_t curTs; + SMJoinGrpRows probeNEqGrps; + bool hashJoin; } SMJoinMergeCtx; typedef struct SMJoinWinCtx { @@ -242,9 +146,9 @@ typedef struct SMJoinOperatorInfo { int32_t subType; int32_t inputTsOrder; SMJoinTableInfo tbs[2]; - SMJoinTableInfo* pBuild; - SMJoinTableInfo* pProbe; - SSDataBlock* pRes; + SMJoinTableInfo* build; + SMJoinTableInfo* probe; + SSDataBlock* pResBlk; int32_t pResColNum; int8_t* pResColMap; SFilterInfo* pFPreFilter; @@ -265,35 +169,9 @@ typedef struct SMJoinOperatorInfo { #define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair)) -#define BUILD_TB_BROKEN_BLK(_sg, _out, _in) ((_sg) && (((_out)->pCurrBlk == (_in)->pBeginBlk && (_out)->rowReadIdx != (_in)->rowBeginIdx) || ((_out)->pCurrBlk != (_in)->pBeginBlk && (_out)->rowReadIdx != 0))) +#define LEFT_JOIN_NO_EQUAL(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts)) +#define LEFT_JOIN_DISCRAD(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts)) -#define FIN_SAME_TS_GRP(_ctx, _octx, _done) do { \ - if ((_ctx)->inSameTsGrp) { \ - (_ctx)->currGrpPair.sameTsGrp = true; \ - (_ctx)->currGrpPair.finishGrp = (_done); \ - SET_SAME_TS_GRP_HJOIN(&(_ctx)->currGrpPair, _octx); \ - (_ctx)->inSameTsGrp = false; \ - (_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \ - } \ - } while (0) - -#define FIN_DIFF_TS_GRP(_ctx, _octx, _done) do { \ - if ((_ctx)->inDiffTsGrp) { \ - (_ctx)->currGrpPair.sameTsGrp = false; \ - (_ctx)->currGrpPair.finishGrp = true; \ - (_ctx)->currGrpPair.probeIn.allRowsGrp= (_done); \ - (_ctx)->inDiffTsGrp = false; \ - (_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \ - } else if (_done) { \ - (_ctx)->currGrpPair.sameTsGrp = false; \ - (_ctx)->currGrpPair.finishGrp = true; \ - (_ctx)->currGrpPair.probeIn.grpRowBeginIdx = (_ctx)->pProbeCtx->blkRowIdx; \ - (_ctx)->currGrpPair.probeIn.allRowsGrp = true; \ - } \ - } while (0) - -#define PRB_CUR_BLK_GRP_ROWS(_rn, _rb, _bn) (((_rn) + (_rb)) <= (_bn) ? (_rn) : ((_bn) - (_rb))) -#define BLD_CUR_BLK_GRP_ROWS(_sg, _rn, _rb, _bn) ((_sg) ? 1 : (((_rn) + (_rb)) <= (_bn) ? (_rn) : ((_bn) - (_rb)))) #ifdef __cplusplus diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 56cb09b67f..3efefb3e74 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -212,11 +212,11 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin break; } - pInfo->pBuild = &pInfo->tbs[buildIdx]; - pInfo->pProbe = &pInfo->tbs[probeIdx]; + pInfo->build = &pInfo->tbs[buildIdx]; + pInfo->probe = &pInfo->tbs[probeIdx]; - pInfo->pBuild->downStreamIdx = buildIdx; - pInfo->pProbe->downStreamIdx = probeIdx; + pInfo->build->downStreamIdx = buildIdx; + pInfo->probe->downStreamIdx = probeIdx; } static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) { @@ -231,7 +231,7 @@ static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhys FOREACH(pNode, pJoinNode->pTargets) { STargetNode* pTarget = (STargetNode*)pNode; SColumnNode* pCol = (SColumnNode*)pTarget->pExpr; - if (pCol->dataBlockId == pInfo->pBuild->blkId) { + if (pCol->dataBlockId == pInfo->build->blkId) { pInfo->pResColMap[i] = 1; } @@ -264,43 +264,19 @@ static int32_t mJoinInitBufPages(SMJoinOperatorInfo* pInfo) { return mJoinAddPageToBufList(pInfo->pRowBufs); } -static int32_t mJoinDoRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTbCtx) { - bool retrieveCont = false; - int32_t code = TSDB_CODE_SUCCESS; - - do { - SSDataBlock* pBlock = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTbCtx->pTbInfo->downStreamIdx); - pTbCtx->dsInitDone = true; - - if (NULL == pBlock) { - retrieveCont = false; - code = (*pJoin->joinFps.handleTbFetchDoneFp)(pJoin, pTbCtx); - } else { - code = (*pTbCtx->blkFetchedFp)(pJoin, pTbCtx, pBlock, &retrieveCont); - } - } while (retrieveCont || TSDB_CODE_SUCCESS != code); - - return code; -} - static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; - SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx; - SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx; pProbeCtx->type = E_JOIN_TB_PROBE; pBuildCtx->type = E_JOIN_TB_BUILD; - pCtx->tsJoinCtx.pProbeCtx = pProbeCtx; - pCtx->tsJoinCtx.pBuildCtx = pBuildCtx; + pCtx->hashCan = pJoin->probe->eqNum > 0; - pCtx->joinPhase = E_JOIN_PHASE_RETRIEVE; + pCtx->probeEqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows)); + pCtx->probeCreatedBlks = taosArrayInit(8, POINTER_BYTES); - pCtx->outputCtx.hashCan = pProbeCtx->pTbInfo->eqNum > 0; - pCtx->outputCtx.pGrpResList = taosArrayInit(MJOIN_DEFAULT_BLK_ROWS_NUM, sizeof(SGrpPairRes)); - if (NULL == pCtx->outputCtx.pGrpResList) { - return TSDB_CODE_OUT_OF_MEMORY; - } + pCtx->buildEqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows)); + pCtx->buildCreatedBlks = taosArrayInit(8, POINTER_BYTES); if (pJoin->pFPreFilter) { pCtx->outputCtx.cartCtx.pResBlk = createOneDataBlock(pJoin->pRes); @@ -314,11 +290,6 @@ static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* p if (!pCtx->outputCtx.hashCan && NULL == pJoin->pFPreFilter) { pCtx->outputCtx.cartCtx.appendRes = true; } - - pCtx->outputCtx.cartCtx.firstColNum = pProbeCtx->pTbInfo->finNum; - pCtx->outputCtx.cartCtx.pFirstCols = pProbeCtx->pTbInfo->finCols; - pCtx->outputCtx.cartCtx.secondColNum = pBuildCtx->pTbInfo->finNum; - pCtx->outputCtx.cartCtx.pSecondCols = pBuildCtx->pTbInfo->finCols; return TSDB_CODE_SUCCESS; } @@ -375,84 +346,6 @@ static int32_t mJoinAddBlkToList(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pCtx return TSDB_CODE_SUCCESS; } -static int32_t mLeftJoinProbeFetchDone(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pProbeCtx) { - pProbeCtx->dsFetchDone = true; - return TSDB_CODE_SUCCESS; -} - -static int32_t mLeftJoinBuildFetchDone(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx) { - pBuildCtx->dsFetchDone = true; - return TSDB_CODE_SUCCESS; -} - - -static int32_t mLeftJoinProbeBlkFetched(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pProbeCtx, SSDataBlock* pBlock, bool* retrieveCont) { - int32_t code = mJoinAddBlkToList(pJoin, pProbeCtx, pBlock); - if (code) { - return code; - } - - *retrieveCont = false; - return TSDB_CODE_SUCCESS; -} - -static int32_t mLeftJoinBuildBlkFetched(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx, SSDataBlock* pBlock, bool* retrieveCont) { - SMJoinTableCtx* pProbeCtx = &pJoin->ctx.mergeCtx.probeTbCtx; - if (pProbeCtx->blkNum <= 0) { - *retrieveCont = false; - return TSDB_CODE_SUCCESS; - } - - SColumnInfoData* tsCol = taosArrayGet(pBlock, pBuildCtx->pTbInfo->primCol->srcSlot); - int64_t lastTs = *((int64_t*)tsCol->pData + pBlock->info.rows - 1); - if (pProbeCtx->blkCurTs > lastTs) { - *retrieveCont = true; - } else { - int32_t code = mJoinAddBlkToList(pJoin, pBuildCtx, pBlock); - if (code) { - return code; - } - - if (pProbeCtx->blkCurTs == lastTs && lastTs == *(int64_t*)tsCol->pData) { - *retrieveCont = true; - } else { - *retrieveCont = false; - } - } - - return TSDB_CODE_SUCCESS; -} - -static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { - SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; - SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx; - SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx; - - if ((!pProbeCtx->dsFetchDone) && MJOIN_TB_LOW_BLK(pProbeCtx)) { - int32_t code = mJoinDoRetrieve(pOperator, pProbeCtx, pJoin->pProbe); - if (TSDB_CODE_SUCCESS != code) { - pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, code); - } - } - - if ((pProbeCtx->blkNum > 0 || MJOIN_DS_NEED_INIT(pOperator, pBuildCtx)) && (!pBuildCtx->dsFetchDone) && MJOIN_TB_LOW_BLK(pBuildCtx)) { - int32_t code = mJoinDoRetrieve(pJoin, pBuildCtx, pJoin->pBuild); - if (TSDB_CODE_SUCCESS != code) { - pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, code); - } - } - - if (pProbeCtx->pHeadBlk) { - pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_SPLIT; - return true; - } - - mJoinSetDone(pOperator); - return false; -} - static FORCE_INLINE void mLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) { pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows; pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows; @@ -464,176 +357,6 @@ static FORCE_INLINE void mLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJoinT pCtx->buildEndTs = (int64_t*)buildCol->pData + pCtx->buildRowNum - 1; } -static bool mJoinProbeMoveToNextBlk(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx) { - if (NULL == pProbeCtx->pCurrBlk->pNext) { - pProbeCtx->blkIdx++; - return false; - } - - pProbeCtx->pCurrBlk = pProbeCtx->pCurrBlk->pNext; - pProbeCtx->blkIdx++; - pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows; - SColumnInfoData* probeCol = taosArrayGet(pProbeCtx->pCurrBlk->pBlk, pProbeCtx->pTbInfo->primCol->srcSlot); - pCtx->probeTs = (int64_t*)probeCol->pData; - pCtx->probeEndTs = (int64_t*)probeCol->pData + pCtx->probeRowNum - 1; - pProbeCtx->blkRowIdx = 0; - - return true; -} - -static void mJoinLeftJoinAddBlkToGrp(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) { - FIN_SAME_TS_GRP(); - - int32_t rowNum = pProbeCtx->pCurrBlk->pBlk->info.rows - pProbeCtx->blkRowIdx; - - if (pCtx->nextProbeRow && pCtx->inDiffTsGrp) { - pCtx->currGrpPair->prbIn.rowNum += rowNum; - } else { - pCtx->inDiffTsGrp = true; - START_NEW_GRP(pCtx); - pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk; - pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx; - pCtx->currGrpPair->prbIn.rowNum = rowNum; - } - - pCtx->nextProbeRow = true; -} - -static bool mJoinBuildMoveToNextBlk(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pBuildCtx, SMJoinTableCtx* pProbeCtx) { - bool contLoop = false; - bool res = true; - - pCtx->nextProbeRow = false; - - do { - if (NULL == pBuildCtx->pCurrBlk->pNext) { - pBuildCtx->blkIdx++; - return false; - } - - pBuildCtx->pCurrBlk = pBuildCtx->pCurrBlk->pNext; - pBuildCtx->blkIdx++; - pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows; - SColumnInfoData* buildCol = taosArrayGet(pBuildCtx->pCurrBlk->pBlk, pBuildCtx->pTbInfo->primCol->srcSlot); - pCtx->buildTs = (int64_t*)buildCol->pData; - pCtx->buildEndTs = (int64_t*)buildCol->pData + pCtx->buildRowNum - 1; - pBuildCtx->blkRowIdx = 0; - - do { - if (*pCtx->buildTs > pCtx->probeTs[pProbeCtx->blkRowIdx]) { - mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx); - contLoop = mJoinProbeMoveToNextBlk(pCtx, pProbeCtx); - } else if (pCtx->probeTs[pProbeCtx->blkRowIdx] > *pCtx->buildEndTs) { - contLoop = true; - pBuildCtx->pCurrBlk->inUse = false; - break; - } else { - contLoop = false; - res = true; - } - } while (contLoop); - } while (contLoop); - - return res; -} - -static bool mLeftJoinSplitGrpImpl(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { - SMJoinTsJoinCtx* pCtx = &pJoin->ctx.mergeCtx.tsJoinCtx; - SMJoinOutputCtx* pOutCtx = &pJoin->ctx.mergeCtx.outputCtx; - SMJoinTableCtx* pProbeCtx = &pJoin->ctx.mergeCtx.probeTbCtx; - SMJoinTableCtx* pBuildCtx = &pJoin->ctx.mergeCtx.buildTbCtx; - - mLefeJoinUpdateTsJoinCtx(pCtx, pProbeCtx, pBuildCtx); - - for (; pProbeCtx->blkIdx < pProbeCtx->blkNum; mJoinProbeMoveToNextBlk(pCtx, pProbeCtx)) { - if (*pCtx->buildTs > *pCtx->probeEndTs) { - mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx); - continue; - } else if (*pCtx->probeTs > *pCtx->buildEndTs) { - if (!mJoinBuildMoveToNextBlk(pJoin, pCtx, pBuildCtx, pProbeCtx)) { - break; - //retrieve build - } - } - - for (; pProbeCtx->blkRowIdx < pCtx->probeRowNum; ++pProbeCtx->blkRowIdx) { - if (pCtx->pLastGrpPair && pCtx->pLastGrpPair->sameTsGrp - && pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->pLastGrpPair->prbIn.grpLastTs) { - pCtx->pLastGrpPair->prbIn.rowNum++; - SET_SAME_TS_GRP_HJOIN(pCtx->pLastGrpPair, pOutCtx); - continue; - } - for (; pBuildCtx->blkIdx < pBuildCtx->blkNum; mJoinBuildMoveToNextBlk(pJoin, pCtx, pBuildCtx, pProbeCtx)) { - for (; pBuildCtx->blkRowIdx < pCtx->buildRowNum; ++pBuildCtx->blkRowIdx) { - if (pCtx->probeTs[pProbeCtx->blkRowIdx] > pCtx->buildTs[pBuildCtx->blkRowIdx]) { - FIN_SAME_TS_GRP(pCtx, pOutCtx, true); - FIN_DIFF_TS_GRP(pCtx, pOutCtx, false); - pCtx->nextProbeRow = false; - continue; - } else if (pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->buildTs[pBuildCtx->blkRowIdx]) { - FIN_DIFF_TS_GRP(pCtx, pOutCtx, false); - if (pCtx->inSameTsGrp) { - pCtx->currGrpPair.bldIn.rowNum++; - } else { - pCtx->inSameTsGrp = true; - START_NEW_GRP(pCtx); - pCtx->currGrpPair.bldIn.pBeginBlk = pBuildCtx->pCurrBlk; - pCtx->currGrpPair.bldIn.rowBeginIdx = pBuildCtx->blkRowIdx; - pCtx->currGrpPair.bldIn.rowNum = 1; - pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk; - pCtx->currGrpPair.prbIn.grpLastTs = pCtx->probeTs[pProbeCtx->blkRowIdx]; - pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx; - pCtx->currGrpPair.prbIn.rowNum = 1; - } - pCtx->nextProbeRow = false; - } else { - FIN_SAME_TS_GRP(pCtx, pOutCtx, true); - if (pCtx->inDiffTsGrp) { - pCtx->currGrpPair.prbIn.rowNum++; - } else { - pCtx->inDiffTsGrp = true; - START_NEW_GRP(pCtx); - pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk; - pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx; - pCtx->currGrpPair.prbIn.rowNum = 1; - } - pCtx->nextProbeRow = true; - break; - } - } - - // end of single build table - if (pCtx->nextProbeRow) { - break; - } - } - - // end of all build tables - if (pCtx->nextProbeRow) { - continue; - } - - if (pCtx->inSameTsGrp) { - FIN_SAME_TS_GRP(pCtx, pOutCtx, pBuildCtx->dsFetchDone); - } - - break; - } - - // end of single probe table - if (pCtx->nextProbeRow) { - continue; - } - - break; - } - - // end of all probe tables - FIN_DIFF_TS_GRP(pCtx, pOutCtx, pBuildCtx->dsFetchDone); - - return true; -} - static FORCE_INLINE void mJoinResetTsJoinCtx(SMJoinTsJoinCtx* pCtx) { pCtx->inSameTsGrp = false; pCtx->inDiffTsGrp = false; @@ -641,25 +364,6 @@ static FORCE_INLINE void mJoinResetTsJoinCtx(SMJoinTsJoinCtx* pCtx) { pCtx->pLastGrpPair = NULL; } -static bool mLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { - SMJoinTsJoinCtx* pCtx = &pJoin->ctx.mergeCtx.tsJoinCtx; - SMJoinTableCtx* pBuildCtx = &pJoin->ctx.mergeCtx.buildTbCtx; - - mJoinResetTsJoinCtx(pCtx); - - if (0 == pJoin->ctx.mergeCtx.buildTbCtx.blkNum) { - ASSERTS(pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone, "left join empty build table while fetch not done"); - - FIN_SAME_TS_GRP(pCtx, &pJoin->ctx.mergeCtx.outputCtx, true); - FIN_DIFF_TS_GRP(pCtx, &pJoin->ctx.mergeCtx.outputCtx, pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone); - } else { - mLeftJoinSplitGrpImpl(pOperator, pJoin); - } - - pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_OUTPUT; - - return true; -} static void mLeftJoinCart(SMJoinCartCtx* pCtx) { int32_t currRows = pCtx->appendRes ? pCtx->pResBlk->info.rows : 0; @@ -698,156 +402,247 @@ static void mLeftJoinCart(SMJoinCartCtx* pCtx) { pCtx->pResBlk.info.rows += pCtx->firstRowNum * pCtx->secondRowNum; } -static bool mLeftJoinSameTsOutput(SMJoinOperatorInfo* pJoin, SGrpPairRes* pPair) { - SProbeGrpResIn* pProbeIn = &pPair->prbIn; - SBuildGrpResIn* pBuildIn = &pPair->prbIn; - - if (!pPair->outBegin) { +static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableInfo* ppTb) { + if (!(*ppTb)->dsFetchDone && (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows)) { + (*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, (*ppTb)->downStreamIdx); + (*ppTb)->dsInitDone = true; + qDebug("merge join %s table got %" PRId64 " rows block", MJOIN_TBTYPE(ppTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0); + + *pIdx = 0; + if (NULL == (*ppBlk)) { + (*ppTb)->dsFetchDone = true; + } + return ((*ppBlk) == NULL) ? false : true; } - SMJoinBlkInfo* pBInfo = pProbeIn->pBeginBlk; + return true; +} + + +static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { + bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->rowIdx, &pJoin->probe->blk, &pJoin->probe); + bool buildGot = false; + do { - if (pJoin->prevFilter) { + if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { + buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->rowIdx, &pJoin->build->blk, &pJoin->build); + } + + if (NULL == pJoin->probe->blk) { + mJoinSetDone(pOperator); + return false; + } else if (buildGot && probeGot) { + SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); + SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); + if (*((int64_t*)pProbeCol->pData + pJoin->probe->rowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { + continue; + } + } + + break; + } while (true); + return true; +} + +static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp, + int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) { + int32_t numRows = pBlock->info.rows; + ASSERT(startPos < numRows); + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId); + + int32_t i = startPos; + for (; i < numRows; ++i) { + char* pNextVal = colDataGetData(pCol, i); + if (timestamp != *(int64_t*)pNextVal) { + break; + } + } + int32_t endPos = i; + *pEndPos = endPos; + + if (endPos - startPos == 0) { + return 0; + } + + SSDataBlock* block = pBlock; + bool createdNewBlock = false; + if (endPos == numRows) { + block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); + taosArrayPush(createdBlocks, &block); + createdNewBlock = true; + } + SRowLocation location = {0}; + for (int32_t j = startPos; j < endPos; ++j) { + location.pDataBlock = block; + location.pos = (createdNewBlock ? j - startPos : j); + taosArrayPush(rowLocations, &location); + } + return 0; +} + + +static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp, bool* allBlk, bool restart) { + SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); + SMJoinGrpRows* pGrp = NULL; + int32_t + + if (restart) { + pGrp = taosArrayGet(pTable->eqGrps, 0); + } else { + pGrp = taosArrayReserve(pTable->eqGrps, 1); + } + + pGrp->beginIdx = pTable->rowIdx++; + pGrp->rowsNum = 1; + pGrp->blk = pTable->blk; + + for (; pTable->rowIdx < pTable->blk->info.rows; ++pTable->rowIdx) { + char* pNextVal = colDataGetData(pCol, pTable->rowIdx); + if (timestamp == *(int64_t*)pNextVal) { + pGrp->rowsNum++; + continue; + } + return; + } + + if (allBlk) { + *allBlk = true; + if (0 == pGrp->beginIdx) { + pGrp->blk = createOneDataBlock(pTable->blk, true); } else { - if (pPair->bldOut.hashJoin) { + pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, pGrp->rowsNum - pGrp->beginIdx); + } + taosArrayPush(pTable->createdBlks, &pGrp->blk); + pGrp->beginIdx = 0; + } +} + +static int32_t mJoinRetrieveSameTsRows(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp) { + SMJoinOperatorInfo* pJoin = pOperator->info; + int32_t endPos = -1; + SSDataBlock* dataBlock = startDataBlock; + bool allBlk = false; + + mJoinBuildEqGroups(pOperator, pTable, timestamp, &allBlk, true); + + while (allBlk) { + pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx); + qDebug("merge join %s table got block for same ts, rows:%" PRId64, MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); + + pTable->rowIdx = 0; + + if (NULL == pTable->blk) { + pTable->dsFetchDone = true; + break; + } + + allBlk = false; + mJoinBuildEqGroups(pOperator, pTable, timestamp, &allBlk, false); + } + + return 0; +} + +static int32_t mJoinEqualCart(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + SMJoinOperatorInfo* pJoin = pOperator->info; + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + SSHashObj* rightTableHash = NULL; + bool rightUseBuildTable = false; + + if (!pCtx->rowRemains) { + mJoinBuildEqGroups(pOperator, pJoin->probe, timestamp, NULL, true); + mJoinRetrieveSameTsRows(pOperator, pJoin->build, timestamp); + if (pJoinInfo->pColEqualOnConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { + mergeJoinFillBuildTable(pJoinInfo, rightRowLocations); + pCtx->hashJoin = true; + taosArrayDestroy(rightRowLocations); + rightRowLocations = NULL; + } + } + + bool reachThreshold = false; + + if (code == TSDB_CODE_SUCCESS) { + mLeftJoinCart(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, + rightRowIdx, pCtx->hashJoin, rightRowLocations, &reachThreshold); + } + + if (!reachThreshold) { + mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, + pCtx->hashJoin, rightRowLocations); + + } else { + pJoinInfo->rowCtx.rowRemains = true; + pJoinInfo->rowCtx.ts = timestamp; + pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; + pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; + pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; + pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; + } + return TSDB_CODE_SUCCESS; +} + + + +static void mLeftJoinDo(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { + SMJoinOperatorInfo* pJoin = pOperator->info; + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + int64_t probeTs = INT64_MIN; + int64_t buildTs = 0; + SColumnInfoData* pBuildCol = NULL; + SColumnInfoData* pProbeCol = NULL; + + bool asc = (pJoin->inputOrder == TSDB_ORDER_ASC) ? true : false; + + do { + if (pCtx->rowRemains) { + probeTs = buildTs = pCtx->curTs; + } else { + if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx, probeTs)) { + break; + } + + pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); + pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); + probeTs = *((int64_t*)pProbeCol->pData + pJoin->probe->rowIdx); + buildTs = *((int64_t*)pBuildCol->pData + pCtx->buildIdx); + } + + while (pCtx->probeIdx < pJoin->probe->blk->info.rows && pCtx->buildIdx < pJoin->build->blk->info.rows) { + if (probeTs == buildTs) { + mJoinEqualCart(pOperator, probeTs, pRes); + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + return; + } + break; + } else if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) { + pCtx->probeNEqGrps.beginIdx = pCtx->probeIdx; + + do { + pCtx->probeNEqGrps.rowsNum++; + probeTs = *((int64_t*)pProbeCol->pData + (++pCtx->probeIdx)); + } while (pCtx->probeIdx < pJoin->probe->blk->info.rows && LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)); + + mJoinNonEqualCart(pOperator, &pCtx->probeNEqGrps, pRes); + + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + return; + } } else { - for (; pProbeIn->rowBeginIdx < pBInfo->pBlk->info.rows && pProbeIn->rowNum > 0; pProbeIn->rowBeginIdx++, pProbeIn->rowNum--) { - + buildTs = *((int64_t*)pBuildCol->pData + (++pCtx->buildIdx)); + while (pCtx->buildIdx < pJoin->build->blk->info.rows && LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) { + buildTs = *((int64_t*)pBuildCol->pData + (++pCtx->buildIdx)); } } } } while (true); } -static bool mLeftJoinCartOutput(SMJoinOperatorInfo* pJoin, SMJoinOutputCtx* pCtx) { - bool contLoop = false; - SMJoinCartCtx* pCart = &pCtx->cartCtx; - int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList); - int32_t rowsLeft = pCart->pResBlk->info.capacity - pCart->pResBlk->info.rows; - int32_t probeRows = 0; - int32_t buildRows = 0; - bool grpDone = false, brokenBlk = false; - for (; pCtx->grpReadIdx < grpNum && pCart->pResBlk->info.rows <= pCart->resThreshold; pCtx->grpReadIdx++) { - SGrpPairRes* pair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx); - if (!pair->finishGrp) { - ASSERTS(pCtx->grpReadIdx == grpNum - 1, "unfinished grp not the last"); - taosArrayRemoveBatch(pCtx->pGrpResList, 0, pCtx->grpReadIdx - 1, NULL); - pCtx->grpReadIdx = 0; - break; - } - - grpDone = false; - pCart->firstOnly = !pair->sameTsGrp; - do { - if (!pair->outBegin) { - probeRows = PRB_CUR_BLK_GRP_ROWS(pair->prbIn.rowNum, pair->prbIn.rowBeginIdx, pair->prbIn.pBeginBlk->pBlk->info.rows); - buildRows = BLD_CUR_BLK_GRP_ROWS(pair->sameTsGrp, pair->bldIn.rowNum, pair->bldIn.rowBeginIdx, pair->bldIn.pBeginBlk->pBlk->info.rows); - pCart->pFirstBlk = pair->prbIn.pBeginBlk; - pCart->firstRowIdx = pair->prbIn.rowBeginIdx; - pair->prbOut.pCurrBlk = pair->prbIn.pBeginBlk; - if (pair->sameTsGrp) { - pCart->pSecondBlk = pair->bldIn.pBeginBlk; - pCart->secondRowIdx = pair->bldIn.rowBeginIdx; - pair->bldOut.pCurrBlk = pair->bldIn.pBeginBlk; - } - pair->outBegin = true; - brokenBlk = false; - } else if (BUILD_TB_BROKEN_BLK(pair->sameTsGrp, &pair->bldOut, &pair->bldIn)) { - probeRows = PRB_CUR_BLK_GRP_ROWS(1, pair->prbOut.rowReadIdx, pair->prbOut.pCurrBlk->pBlk.info.rows); - buildRows = BLD_CUR_BLK_GRP_ROWS(pair->bldIn.rowNum - pair->bldOut.rowGReadNum, pair->bldOut.rowReadIdx, pair->bldOut.pCurrBlk->pBlk.info.rows); - pCart->firstRowIdx = pair->prbOut.rowReadIdx; - pCart->secondRowIdx = pair->bldOut.rowReadIdx; - brokenBlk = true; - } else { - probeRows = PRB_CUR_BLK_GRP_ROWS(pair->prbIn.rowNum - pair->prbOut.rowGReadNum, pair->prbOut.rowReadIdx, pair->prbOut.pCurrBlk->pBlk.info.rows); - buildRows = BLD_CUR_BLK_GRP_ROWS(pair->bldIn.rowNum - pair->bldOut.rowGReadNum, pair->bldOut.rowReadIdx, pair->bldOut.pCurrBlk->pBlk.info.rows); - pCart->firstRowIdx = pair->prbOut.rowReadIdx; - if (pair->sameTsGrp) { - pCart->secondRowIdx = pair->bldOut.rowReadIdx; - } - brokenBlk = false; - } - - int64_t reqNum = probeRows * buildRows; - if (reqNum <= rowsLeft) { - pCart->firstRowNum = probeRows; - pCart->secondRowNum = buildRows; - - pair->prbOut.rowGReadNum += probeRows; - if (!brokenBlk) { - pair->prbOut.rowReadIdx = 0; - - if (pair->sameTsGrp) { - pair->bldOut.rowGReadNum += buildRows; - pair->bldOut.rowReadIdx = 0; - } - } else { - pair->prbOut.rowReadIdx += probeRows; - } - - if (pair->prbOut.rowGReadNum >= pair->prbIn.rowNum) { - grpDone = true; - } - - rowsLeft -= reqNum; - } else if (buildRows <= rowsLeft) { - pCart->firstRowNum = brokenBlk ? 1 : (rowsLeft / buildRows); - - pair->prbOut.rowGReadNum += pCart->firstRowNum; - pair->prbOut.rowReadIdx = pCart->firstRowIdx + pCart->firstRowNum; - - pCart->secondRowNum = buildRows; - pair->bldOut.rowReadIdx = 0; - - rowsLeft -= (pCart->firstRowNum * pCart->secondRowNum); - } else { - ASSERT(pair->sameTsGrp); - - pCart->firstRowNum = 1; - pCart->secondRowNum = rowsLeft; - - pair->bldOut.rowReadIdx = pCart->secondRowIdx + rowsLeft; - - rowsLeft = 0; - } - - mLeftJoinCart(pCart); - }while ((!grpDone) && pCart->pResBlk->info.rows <= pCart->resThreshold); - - if (!grpDone) { - break; - } - } - -} - - - -static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { - bool contLoop = false; - - do { - switch (pJoin->ctx.mergeCtx.joinPhase) { - case E_JOIN_PHASE_RETRIEVE: - contLoop = mLeftJoinRetrieve(pOperator, pJoin); - break; - case E_JOIN_PHASE_SPLIT: - contLoop = mLeftJoinSplitGrp(pOperator, pJoin); - break; - case E_JOIN_PHASE_OUTPUT: - contLoop = mLeftJoinOutput(pOperator, pJoin); - break; - case E_JOIN_PHASE_DONE: - contLoop = false; - break; - } - } while (contLoop); - -} SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info;