From 7c0e8b559c0f2acafad04a2f0d5e6103b433644c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 6 Dec 2023 19:22:14 +0800 Subject: [PATCH] feat: support left join --- source/common/src/tdatablock.c | 36 ++ source/libs/executor/inc/mergejoin.h | 137 ++++--- source/libs/executor/src/mergejoinoperator.c | 371 ++++++++++++++----- 3 files changed, 396 insertions(+), 148 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 054cff560f..87d49cd902 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -479,6 +479,42 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p return 0; } +int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows) { + if (pDst->info.type != pSrc->info.type || pSrc->reassigned) { + return TSDB_CODE_FAILED; + } + + if (numOfRows <= 0) { + return numOfRows; + } + + if (IS_VAR_DATA_TYPE(pDst->info.type)) { + memcpy(pDst->varmeta.offset, pSrc->varmeta.offset, sizeof(int32_t) * numOfRows); + if (pDst->varmeta.allocLen < pSrc->varmeta.length) { + char* tmp = taosMemoryRealloc(pDst->pData, pSrc->varmeta.length); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pDst->pData = tmp; + pDst->varmeta.allocLen = pSrc->varmeta.length; + } + + pDst->varmeta.length = pSrc->varmeta.length; + if (pDst->pData != NULL && pSrc->pData != NULL) { + memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length); + } + } else { + memcpy(pDst->nullbitmap, pSrc->nullbitmap, BitmapLen(numOfRows)); + if (pSrc->pData != NULL) { + memcpy(pDst->pData, pSrc->pData, pSrc->info.bytes * numOfRows); + } + } + + return 0; +} + + size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); } size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index cee789e426..8ee5bf8301 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -30,20 +30,29 @@ typedef enum EJoinPhase { E_JOIN_PHASE_RETRIEVE, E_JOIN_PHASE_SPLIT, E_JOIN_PHASE_OUTPUT, - E_JOIN_PHASE_ + E_JOIN_PHASE_DONE } EJoinPhase; typedef struct SMJoinColInfo { int32_t srcSlot; int32_t dstSlot; - bool keyCol; - bool vardata; - int32_t* offset; - int32_t bytes; - char* data; - char* bitMap; } SMJoinColInfo; +typedef struct SMJoinCartCtx { + SSDataBlock* pResBlk; + + int32_t firstColNum; + SMJoinColInfo* pFirstCols; + SSDataBlock* pFirstBlk; + int32_t firstRowIdx; + int32_t firstRowNum; + + int32_t secondColNum; + SMJoinColInfo* pSecondCols; + SSDataBlock* pSecondBlk; + int32_t secondRowIdx; + int32_t secondRowNum; +} SMJoinCartCtx; typedef struct SMJoinBlkInfo { bool cloned; @@ -65,6 +74,10 @@ typedef struct SMJoinTableInfo { SMJoinColInfo* primCol; char* primData; + + int32_t finNum; + SMJoinColInfo* finCols; + int32_t keyNum; SMJoinColInfo* keyCols; @@ -91,41 +104,61 @@ typedef struct SMJoinTsJoinCtx { int64_t* buildEndTs; bool inSameTsGrp; bool inDiffTsGrp; - SGrpPairCtx* pLastGrpPairCtx; - SGrpPairCtx currGrpPairCtx; + bool nextProbeRow; + SGrpPairRes* pLastGrpPair; + SGrpPairRes currGrpPair; } SMJoinTsJoinCtx; -typedef struct SBuildGrpCtx { +typedef struct SBuildGrpResIn { bool multiBlkGrp; + SMJoinBlkInfo* grpBeginBlk; + int32_t grpRowBeginIdx; + int32_t grpRowNum; +} SBuildGrpResIn; + +typedef struct SBuildGrpResOut { bool hashJoin; SSHashObj* pGrpHash; int32_t grpRowReadIdx; int32_t grpRowGReadIdx; +} SBuildGrpResOut; + +typedef struct SProbeGrpResIn { + bool allRowsGrp; + SMJoinBlkInfo* grpBeginBlk; int32_t grpRowBeginIdx; int32_t grpRowNum; -} SBuildGrpCtx; + int64_t grpLastTs; +} SProbeGrpResIn; -typedef struct SProbeGrpCtx { +typedef struct SProbeGrpResOut { int32_t grpRowReadIdx; - int32_t grpRowBeginIdx; - int32_t grpRowNum; -} SProbeGrpCtx; +} SProbeGrpResOut; -typedef struct SGrpPairCtx { - bool sameTsGrp; - bool finishGrp; - SBuildGrpCtx buildGrp; - SProbeGrpCtx probeGrp; -} SGrpPairCtx; +typedef struct SGrpPairRes { + bool sameTsGrp; + bool finishGrp; + SProbeGrpResIn probeIn; + SBuildGrpResIn buildIn; + + /* KEEP THIS PART AT THE END */ + bool outBegin; + SBuildGrpResOut buildOut; + SProbeGrpResOut probeOut; + /* KEEP THIS PART AT THE END */ +} SGrpPairRes; + +#define GRP_PAIR_INIT_SIZE (sizeof(SGrpPairRes) - sizeof(bool) - sizeof(SBuildGrpResOut) - sizeof(SProbeGrpResOut)) typedef struct SMJoinOutputCtx { - int32_t grpReadIdx; - int32_t grpWriteIdx; - SArray* pGrpList; + int32_t grpReadIdx; + SMJoinCartCtx cartCtx; + SArray* pGrpResList; } SMJoinOutputCtx; typedef struct SMJoinTableCtx { EJoinTableType type; + void* blkFetchedFp; SMJoinTableInfo* pTbInfo; bool dsInitDone; bool dsFetchDone; @@ -197,40 +230,34 @@ typedef struct SMJoinOperatorInfo { SMJoinExecInfo execInfo; } SMJoinOperatorInfo; -#define MJOIN_DOWNSTREAM_NEED_INIT(_pOp) ((_pOp)->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)(_pOp)->pOperatorGetParam->value)->initDownstream) +#define MJOIN_DS_REQ_INIT(_pOp) ((_pOp)->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)(_pOp)->pOperatorGetParam->value)->initDownstream) +#define MJOIN_DS_NEED_INIT(_pOp, _tbctx) (MJOIN_DS_REQ_INIT(_pOp) && (!(_tbctx)->dsInitDone)) +#define MJOIN_TB_LOW_BLK(_tbctx) ((_tbctx)->blkNum <= 0 || ((_tbctx)->blkNum == 1 && (_tbctx)->pHeadBlk->cloned)) -#define FIN_SAME_TS_GRP() do { \ - if (inSameTsGrp) { \ - grpPairCtx.sameTsGrp = true; \ - grpPairCtx.finishGrp = true; \ - grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \ - grpPairCtx.probeGrp.grpRowNum = 1; \ - inSameTsGrp = false; \ - pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \ - } \ +#define START_NEW_GRP(_ctx) memset(&(_ctx)->currGrpPair, 0, GRP_PAIR_INIT_SIZE) + +#define FIN_SAME_TS_GRP(_ctx, _octx, _done) do { \ + if ((_ctx)->inSameTsGrp) { \ + (_ctx)->currGrpPair.sameTsGrp = true; \ + (_ctx)->currGrpPair.finishGrp = (_done); \ + (_ctx)->inSameTsGrp = false; \ + (_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \ + } \ } while (0) -#define PAUSE_SAME_TS_GRP() do { \ - if (inSameTsGrp) { \ - grpPairCtx.sameTsGrp = true; \ - grpPairCtx.finishGrp = false; \ - grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \ - grpPairCtx.probeGrp.grpRowNum = 1; \ - inSameTsGrp = false; \ - pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \ - } \ - } while (0) - - -#define FIN_DIFF_TS_GRP() do { \ - if (inDiffTsGrp) { \ - grpPairCtx.sameTsGrp = false; \ - grpPairCtx.finishGrp = true; \ - grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \ - grpPairCtx.probeGrp.grpRowNum = 1; \ - inDiffTsGrp = false; \ - pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \ - } \ +#define FIN_DIFF_TS_GRP(_ctx, _octx, _done) do { \ + if ((_ctx)->inDiffTsGrp) { \ + (_ctx)->currGrpPair.sameTsGrp = false; \ + (_ctx)->currGrpPair.finishGrp = true; \ + (_ctx)->currGrpPair.probeIn.allRowsGrp= (_done); \ + (_ctx)->inDiffTsGrp = false; \ + (_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \ + } else if (_done) { \ + (_ctx)->currGrpPair.sameTsGrp = false; \ + (_ctx)->currGrpPair.finishGrp = true; \ + (_ctx)->currGrpPair.probeIn.grpRowBeginIdx = (_ctx)->pProbeCtx->blkRowIdx; \ + (_ctx)->currGrpPair.probeIn.allRowsGrp = true; \ + } \ } while (0) diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index ffaac1e323..a220dce31d 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -266,93 +266,147 @@ static int32_t mJoinInitBufPages(SMJoinOperatorInfo* pInfo) { return mJoinAddPageToBufList(pInfo->pRowBufs); } -static int32_t mJoinDoRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTbCtx, SMJoinTableInfo* pTbInfo) { +static int32_t mJoinDoRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTbCtx) { bool retrieveCont = false; int32_t code = TSDB_CODE_SUCCESS; do { - SSDataBlock* pBlock = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTbInfo->downStreamIdx); + SSDataBlock* pBlock = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTbCtx->pTbInfo->downStreamIdx); pTbCtx->dsInitDone = true; if (NULL == pBlock) { retrieveCont = false; - code = (*pJoin->joinFps.handleTbFetchDoneFp)(pJoin, pTbCtx, pTbInfo); + code = (*pJoin->joinFps.handleTbFetchDoneFp)(pJoin, pTbCtx); } else { - code = (*pJoin->joinFps.handleBlkFetchedFp)(pJoin, pTbCtx, pTbInfo, pBlock, &retrieveCont); + code = (*pTbCtx->blkFetchedFp)(pJoin, pTbCtx, pBlock, &retrieveCont); } } while (retrieveCont || TSDB_CODE_SUCCESS != code); return code; } -static FORCE_INLINE bool mJoinBuildTbNeedRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx) { - if ((!pBuildCtx->grpRetrieved) && (!pBuildCtx->dsFetchDone)) { - return true; - } - return false; -} - -static FORCE_INLINE bool mJoinProbeTbNeedRetrieve(SOperatorInfo* pOperator, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) { - SMJoinOperatorInfo* pJoin = pOperator->info; - if (MJOIN_DOWNSTREAM_NEED_INIT(pOperator) && !pBuildCtx->dsInitDone) { - return true; - } - if (((!pProbeCtx->grpRetrieved) && (!pProbeCtx->dsFetchDone)) && (pBuildCtx->grpRetrieved || pJoin->ctx.flags.retrieveAfterBuildDone))) { - return true; - } - return false; -} - -static bool mJoinOpenMergeJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { - SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; - SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx; - SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx; - int32_t code = mJoinDoRetrieve(pOperator, pBuildCtx, pJoin->pBuild); - if (TSDB_CODE_SUCCESS != code) { - pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, code); - } - if (pBuildCtx->dsFetchDone && !pJoin->ctx.flags.retrieveAfterBuildDone) { - return false; - } - - code = mJoinDoRetrieve(pOperator, pProbeCtx, pJoin->pProbe); - if (TSDB_CODE_SUCCESS != code) { - pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, code); - } - return true; -} - static void mJoinInitJoinCtx(SMJoinOperatorInfo* pJoin) { + blkFetchedFp; } -static void mJoinLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { - SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; - SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx; - SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx; +static void mJoinSetDone(SOperatorInfo* pOperator) { + setOperatorCompleted(pOperator); + if (pOperator->pDownstreamGetParams) { + freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM); + freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM); + pOperator->pDownstreamGetParams[0] = NULL; + pOperator->pDownstreamGetParams[1] = NULL; + } +} - int32_t code = mJoinDoRetrieve(pOperator, pProbeCtx, pJoin->pProbe); - if (TSDB_CODE_SUCCESS != code) { - pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, code); +static int32_t mJoinAddBlkToList(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pCtx, SSDataBlock* pBlock) { + SMJoinBlkInfo* pNew = taosMemoryCalloc(1, sizeof(SMJoinBlkInfo)); + if (NULL == pNew) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pNew->pBlk = pBlock; + if (NULL == pCtx->pTailBlk) { + pCtx->pTailBlk = pCtx->pHeadBlk = pNew; + pCtx->pCurrBlk = pCtx->pHeadBlk; + pCtx->blkIdx = 0; + pCtx->blkRowIdx = 0; + pCtx->blkNum = 1; + if (E_JOIN_TB_PROBE == pCtx->type) { + SColumnInfoData* probeCol = taosArrayGet(pCtx->pCurrBlk->pBlk, pCtx->pTbInfo->primCol->srcSlot); + pCtx->blkCurTs = *(int64_t*)probeCol->pData; + } + } else { + pCtx->pTailBlk->pNext = pNew; + pCtx->blkNum++; + if (E_JOIN_TB_PROBE == pCtx->type) { + SMJoinTsJoinCtx* pTsCtx = &pJoin->ctx.mergeCtx.tsJoinCtx; + pCtx->blkCurTs = pTsCtx->probeTs[pCtx->blkRowIdx]; + } } - if (NULL == pProbeCtx->pHeadBlk) { - return; + return TSDB_CODE_SUCCESS; +} + +static int32_t mLeftJoinProbeFetchDone(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pProbeCtx) { + pProbeCtx->dsFetchDone = true; + return TSDB_CODE_SUCCESS; +} + +static int32_t mLeftJoinBuildFetchDone(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx) { + pBuildCtx->dsFetchDone = true; + return TSDB_CODE_SUCCESS; +} + + +static int32_t mLeftJoinProbeBlkFetched(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pProbeCtx, SSDataBlock* pBlock, bool* retrieveCont) { + int32_t code = mJoinAddBlkToList(pJoin, pProbeCtx, pBlock); + if (code) { + return code; } - code = mJoinDoRetrieve(pJoin, pBuildCtx, pJoin->pBuild); - if (TSDB_CODE_SUCCESS != code) { - pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, code); - } - - pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_TS_JOIN; + *retrieveCont = false; + return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void mJoinLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) { +static int32_t mLeftJoinBuildBlkFetched(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx, SSDataBlock* pBlock, bool* retrieveCont) { + SMJoinTableCtx* pProbeCtx = &pJoin->ctx.mergeCtx.probeTbCtx; + if (pProbeCtx->blkNum <= 0) { + *retrieveCont = false; + return TSDB_CODE_SUCCESS; + } + + SColumnInfoData* tsCol = taosArrayGet(pBlock, pBuildCtx->pTbInfo->primCol->srcSlot); + int64_t lastTs = *((int64_t*)tsCol->pData + pBlock->info.rows - 1); + if (pProbeCtx->blkCurTs > lastTs) { + *retrieveCont = true; + } else { + int32_t code = mJoinAddBlkToList(pJoin, pBuildCtx, pBlock); + if (code) { + return code; + } + + if (pProbeCtx->blkCurTs == lastTs && lastTs == *(int64_t*)tsCol->pData) { + *retrieveCont = true; + } else { + *retrieveCont = false; + } + } + + return TSDB_CODE_SUCCESS; +} + +static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx; + SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx; + + if ((!pProbeCtx->dsFetchDone) && MJOIN_TB_LOW_BLK(pProbeCtx)) { + int32_t code = mJoinDoRetrieve(pOperator, pProbeCtx, pJoin->pProbe); + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + } + + if ((pProbeCtx->blkNum > 0 || MJOIN_DS_NEED_INIT(pOperator, pBuildCtx)) && (!pBuildCtx->dsFetchDone) && MJOIN_TB_LOW_BLK(pBuildCtx)) { + int32_t code = mJoinDoRetrieve(pJoin, pBuildCtx, pJoin->pBuild); + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + } + + if (pProbeCtx->pHeadBlk) { + pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_SPLIT; + return true; + } + + mJoinSetDone(pOperator); + return false; +} + +static FORCE_INLINE void mLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) { pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows; pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows; SColumnInfoData* probeCol = taosArrayGet(pProbeCtx->pCurrBlk->pBlk, pProbeCtx->pTbInfo->primCol->srcSlot); @@ -363,13 +417,14 @@ static FORCE_INLINE void mJoinLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJ pCtx->buildEndTs = (int64_t*)buildCol->pData + pCtx->buildRowNum - 1; } -static bool mJoinMoveToNextProbeTable(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx) { +static bool mJoinProbeMoveToNextBlk(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx) { if (NULL == pProbeCtx->pCurrBlk->pNext) { pProbeCtx->blkIdx++; return false; } pProbeCtx->pCurrBlk = pProbeCtx->pCurrBlk->pNext; + pProbeCtx->blkIdx++; pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows; SColumnInfoData* probeCol = taosArrayGet(pProbeCtx->pCurrBlk->pBlk, pProbeCtx->pTbInfo->primCol->srcSlot); pCtx->probeTs = (int64_t*)probeCol->pData; @@ -379,17 +434,38 @@ static bool mJoinMoveToNextProbeTable(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pPr return true; } -static bool mJoinMoveToNextBuildTable(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pBuildCtx, SMJoinTableCtx* pProbeCtx) { +static void mJoinLeftJoinAddBlkToGrp(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) { + FIN_SAME_TS_GRP(); + + int32_t rowNum = pProbeCtx->pCurrBlk->pBlk->info.rows - pProbeCtx->blkRowIdx; + + if (pCtx->nextProbeRow && pCtx->inDiffTsGrp) { + pCtx->currGrpPair->probeIn.grpRowNum += rowNum; + } else { + pCtx->inDiffTsGrp = true; + START_NEW_GRP(pCtx); + pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk; + pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx; + pCtx->currGrpPair->probeIn.grpRowNum = rowNum; + } + + pCtx->nextProbeRow = true; +} + +static bool mJoinBuildMoveToNextBlk(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pBuildCtx, SMJoinTableCtx* pProbeCtx) { bool contLoop = false; - bool res = false; + bool res = true; + + pCtx->nextProbeRow = false; do { - if (pBuildCtx->pCurrBlk->pNext) { + if (NULL == pBuildCtx->pCurrBlk->pNext) { pBuildCtx->blkIdx++; return false; } pBuildCtx->pCurrBlk = pBuildCtx->pCurrBlk->pNext; + pBuildCtx->blkIdx++; pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows; SColumnInfoData* buildCol = taosArrayGet(pBuildCtx->pCurrBlk->pBlk, pBuildCtx->pTbInfo->primCol->srcSlot); pCtx->buildTs = (int64_t*)buildCol->pData; @@ -399,8 +475,9 @@ static bool mJoinMoveToNextBuildTable(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx do { if (*pCtx->buildTs > pCtx->probeTs[pProbeCtx->blkRowIdx]) { mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx); - contLoop = mJoinMoveToNextProbeTable(pCtx, pProbeCtx); + contLoop = mJoinProbeMoveToNextBlk(pCtx, pProbeCtx); } else if (pCtx->probeTs[pProbeCtx->blkRowIdx] > *pCtx->buildEndTs) { + contLoop = true; break; } else { contLoop = false; @@ -412,75 +489,90 @@ static bool mJoinMoveToNextBuildTable(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx return res; } - - -static void mJoinLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { +static bool mLeftJoinSplitGrpImpl(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { SMJoinTsJoinCtx* pCtx = &pJoin->ctx.mergeCtx.tsJoinCtx; + SMJoinOutputCtx* pOutCtx = &pJoin->ctx.mergeCtx.outputCtx; SMJoinTableCtx* pProbeCtx = &pJoin->ctx.mergeCtx.probeTbCtx; SMJoinTableCtx* pBuildCtx = &pJoin->ctx.mergeCtx.buildTbCtx; - mJoinLefeJoinUpdateTsJoinCtx(pCtx, pProbeCtx, pBuildCtx); + mLefeJoinUpdateTsJoinCtx(pCtx, pProbeCtx, pBuildCtx); - bool nextRow = false; - for (; pProbeCtx->blkIdx < pProbeCtx->blkNum; mJoinMoveToNextProbeTable(pCtx, pProbeCtx)) { + for (; pProbeCtx->blkIdx < pProbeCtx->blkNum; mJoinProbeMoveToNextBlk(pCtx, pProbeCtx)) { if (*pCtx->buildTs > *pCtx->probeEndTs) { mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx); continue; } else if (*pCtx->probeTs > *pCtx->buildEndTs) { - if (!mJoinMoveToNextBuildTable(pJoin, pCtx, pBuildCtx, pProbeCtx)) { + if (!mJoinBuildMoveToNextBlk(pJoin, pCtx, pBuildCtx, pProbeCtx)) { break; //retrieve build } } for (; pProbeCtx->blkRowIdx < pCtx->probeRowNum; ++pProbeCtx->blkRowIdx) { - for (; pBuildCtx->blkIdx < pBuildCtx->blkNum; mJoinMoveToNextBuildTable(pJoin, pCtx, pBuildCtx, pProbeCtx)) { + if (pCtx->pLastGrpPair && pCtx->pLastGrpPair->sameTsGrp + && pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->pLastGrpPair->probeIn.grpLastTs) { + pCtx->pLastGrpPair->probeIn.grpRowNum++; + continue; + } + for (; pBuildCtx->blkIdx < pBuildCtx->blkNum; mJoinBuildMoveToNextBlk(pJoin, pCtx, pBuildCtx, pProbeCtx)) { for (; pBuildCtx->blkRowIdx < pCtx->buildRowNum; ++pBuildCtx->blkRowIdx) { if (pCtx->probeTs[pProbeCtx->blkRowIdx] > pCtx->buildTs[pBuildCtx->blkRowIdx]) { - FIN_SAME_TS_GRP(); - FIN_DIFF_TS_GRP(); + FIN_SAME_TS_GRP(pCtx, pOutCtx, true); + FIN_DIFF_TS_GRP(pCtx, pOutCtx, false); + pCtx->nextProbeRow = false; continue; } else if (pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->buildTs[pBuildCtx->blkRowIdx]) { - FIN_DIFF_TS_GRP(); + FIN_DIFF_TS_GRP(pCtx, pOutCtx, false); if (pCtx->inSameTsGrp) { - pCtx->currGrpPairCtx.buildGrp.grpRowNum++; + pCtx->currGrpPair.buildIn.grpRowNum++; } else { - pCtx->inSameTsGrp = true; - pCtx->currGrpPairCtx.buildGrp.grpRowBeginIdx = pBuildCtx->blkRowIdx; + pCtx->inSameTsGrp = true; + START_NEW_GRP(pCtx); + pCtx->currGrpPair.buildIn.grpBeginBlk = pBuildCtx->pCurrBlk; + pCtx->currGrpPair.buildIn.grpRowBeginIdx = pBuildCtx->blkRowIdx; + pCtx->currGrpPair.buildIn.grpRowNum = 1; + pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk; + pCtx->currGrpPair.probeIn.grpLastTs = pCtx->probeTs[pProbeCtx->blkRowIdx]; + pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx; + pCtx->currGrpPair.probeIn.grpRowNum = 1; } + pCtx->nextProbeRow = false; } else { - FIN_SAME_TS_GRP(); + FIN_SAME_TS_GRP(pCtx, pOutCtx, true); if (pCtx->inDiffTsGrp) { - pCtx->currGrpPairCtx.probeGrp.grpRowNum++; + pCtx->currGrpPair.probeIn.grpRowNum++; } else { pCtx->inDiffTsGrp = true; - pCtx->currGrpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; + START_NEW_GRP(pCtx); + pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk; + pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx; + pCtx->currGrpPair.probeIn.grpRowNum = 1; } - nextRow = true; + pCtx->nextProbeRow = true; break; } } // end of single build table - if (nextRow) { + if (pCtx->nextProbeRow) { break; } } // end of all build tables - if (nextRow) { + if (pCtx->nextProbeRow) { continue; } if (pCtx->inSameTsGrp) { - PAUSE_SAME_TS_GRP(); + FIN_SAME_TS_GRP(pCtx, pOutCtx, pBuildCtx->dsFetchDone); } break; } // end of single probe table - if (nextRow) { + if (pCtx->nextProbeRow) { continue; } @@ -488,15 +580,105 @@ static void mJoinLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* } // end of all probe tables - FIN_DIFF_TS_GRP(); + FIN_DIFF_TS_GRP(pCtx, pOutCtx, pBuildCtx->dsFetchDone); + + return true; +} + +static FORCE_INLINE void mJoinResetTsJoinCtx(SMJoinTsJoinCtx* pCtx) { + pCtx->inSameTsGrp = false; + pCtx->inDiffTsGrp = false; + pCtx->nextProbeRow = false; + pCtx->pLastGrpPair = NULL; +} + +static bool mLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { + SMJoinTsJoinCtx* pCtx = &pJoin->ctx.mergeCtx.tsJoinCtx; + SMJoinTableCtx* pBuildCtx = &pJoin->ctx.mergeCtx.buildTbCtx; + + mJoinResetTsJoinCtx(pCtx); + + if (0 == pJoin->ctx.mergeCtx.buildTbCtx.blkNum) { + ASSERTS(pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone, "left join empty build table while fetch not done"); + + FIN_DIFF_TS_GRP(pCtx, &pJoin->ctx.mergeCtx.outputCtx, pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone); + } else { + mLeftJoinSplitGrpImpl(pOperator, pJoin); + } pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_OUTPUT; return true; } -static bool mJoinLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { +static void mLeftJoinCart(SMJoinCartCtx* pCtx) { + int32_t currRows = pCtx->pResBlk->info.rows; + for (int32_t c = 0; c < pCtx->firstColNum; ++c) { + SMJoinColInfo* pFirstCol = pCtx->pFirstCols + c; + SColumnInfoData* pInCol = taosArrayGet(pCtx->pFirstBlk->pDataBlock, pFirstCol->srcSlot); + SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pFirstCol->dstSlot); + for (int32_t r = 0; r < pCtx->firstRowNum; ++r) { + if (colDataIsNull_s(pInCol, pCtx->firstRowIdx + r)) { + colDataSetNItemsNull(pOutCol, currRows + r * pCtx->secondRowNum, pCtx->secondRowNum); + } else { + colDataSetNItems(pOutCol, currRows + r * pCtx->secondRowNum, colDataGetData(pInCol, pCtx->firstRowIdx + r), pCtx->secondRowNum, true); + } + } + } +} +static bool mLeftJoinSameTsOutput(SMJoinOperatorInfo* pJoin, SGrpPairRes* pPair) { + SProbeGrpResIn* pProbeIn = &pPair->probeIn; + SBuildGrpResIn* pBuildIn = &pPair->buildIn; + + if (!pPair->outBegin) { + + } + + SMJoinBlkInfo* pBInfo = pProbeIn->grpBeginBlk; + do { + if (pJoin->prevFilter) { + + } else { + if (pPair->buildOut.hashJoin) { + + } else { + for (; pProbeIn->grpRowBeginIdx < pBInfo->pBlk->info.rows && pProbeIn->grpRowNum > 0; pProbeIn->grpRowBeginIdx++, pProbeIn->grpRowNum--) { + + } + } + } + } while (true); +} + +static bool mLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { + SMJoinOutputCtx* pCtx = &pJoin->ctx.mergeCtx.outputCtx; + bool contLoop = false; + int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList); + + for (; pCtx->grpReadIdx < grpNum; pCtx->grpReadIdx++) { + SGrpPairRes* pPair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx); + if (!pPair->finishGrp) { + ASSERTS(pCtx->grpReadIdx == grpNum - 1, "unfinished grp not the last"); + taosArrayRemoveBatch(pCtx->pGrpResList, 0, pCtx->grpReadIdx - 1, NULL); + pCtx->grpReadIdx = 0; + break; + } + + if (pPair->sameTsGrp) { + contLoop = mLeftJoinSameTsOutput(pJoin, pPair); + } else { + contLoop = mLeftJoinDirectOutput(pJoin, pPair); + } + + if (!contLoop) { + return false; + } + } + + pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_RETRIEVE; + + return true; } static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { @@ -505,13 +687,16 @@ static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo do { switch (pJoin->ctx.mergeCtx.joinPhase) { case E_JOIN_PHASE_RETRIEVE: - contLoop = mJoinLeftJoinRetrieve(pOperator, pJoin); + contLoop = mLeftJoinRetrieve(pOperator, pJoin); break; case E_JOIN_PHASE_SPLIT: - contLoop = mJoinLeftJoinSplitGrp(pOperator, pJoin); + contLoop = mLeftJoinSplitGrp(pOperator, pJoin); break; case E_JOIN_PHASE_OUTPUT: - contLoop = mJoinLeftJoinOutput(pOperator, pJoin); + contLoop = mLeftJoinOutput(pOperator, pJoin); + break; + case E_JOIN_PHASE_DONE: + contLoop = false; break; } } while (contLoop);