diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 221dc3cd1f..99ef9c9548 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -207,6 +207,7 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock); size_t blockDataGetNumOfRows(const SSDataBlock* pBlock); int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc); +int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows); int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize); int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); @@ -231,6 +232,7 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); void blockDataCleanup(SSDataBlock* pDataBlock); +void blockDataReset(SSDataBlock* pDataBlock); void blockDataEmpty(SSDataBlock* pDataBlock); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index d898c69eb0..0a13e2ba38 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -649,6 +649,24 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { return TSDB_CODE_SUCCESS; } +int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) { + if (pDest->info.rows + numOfRows > pDest->info.capacity) { + return TSDB_CODE_FAILED; + } + + size_t numOfCols = taosArrayGetSize(pDest->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); + SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); + + colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows); + } + + pDest->info.rows += pSrc->info.rows; + return TSDB_CODE_SUCCESS; +} + + size_t blockDataGetSize(const SSDataBlock* pBlock) { size_t total = 0; size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -751,6 +769,8 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); + colDataAssignNRows(pDstCol, 0, pColData, startIndex, rowCount); + /* for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { bool isNull = false; if (pBlock->pBlockAgg == NULL) { @@ -766,6 +786,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 colDataSetVal(pDstCol, j - startIndex, p, false); } } + */ } pDst->info.rows = rowCount; @@ -1282,6 +1303,31 @@ void blockDataEmpty(SSDataBlock* pDataBlock) { pInfo->window.skey = 0; } +void blockDataReset(SSDataBlock* pDataBlock) { + SDataBlockInfo* pInfo = &pDataBlock->info; + if (pInfo->capacity == 0) { + return; + } + + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + p->hasNull = false; + p->reassigned = false; + if (IS_VAR_DATA_TYPE(p->info.type)) { + p->varmeta.length = 0; + } + } + + pInfo->rows = 0; + pInfo->dataLoad = 0; + pInfo->window.ekey = 0; + pInfo->window.skey = 0; + pInfo->id.uid = 0; + pInfo->id.groupId = 0; +} + + /* * NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote * the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 52dae0aadd..af61f11f35 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -87,8 +87,7 @@ typedef struct SMJoinTableInfo { SSDataBlock* blk; int32_t blkRowIdx; - int64_t grpRowsNum; - int64_t grpRemainRows; + int64_t grpTotalRows; int32_t grpIdx; SArray* eqGrps; SArray* createdBlks; @@ -111,19 +110,18 @@ typedef struct SMJoinGrpRows { } SMJoinGrpRows; typedef struct SMJoinMergeCtx { - bool hashCan; - bool keepOrder; - bool grpRemains; - bool midRemains; - bool eqCart; - bool noColCond; - int32_t blksCapacity; - SSDataBlock* midBlk; - SSDataBlock* finBlk; - SSDataBlock* resBlk; - int64_t lastEqTs; - SMJoinGrpRows probeNEqGrp; - bool hashJoin; + bool hashCan; + bool keepOrder; + bool grpRemains; + bool midRemains; + bool lastEqGrp; + int32_t blkThreshold; + SSDataBlock* midBlk; + SSDataBlock* finBlk; + int64_t lastEqTs; + SMJoinGrpRows probeNEqGrp; + bool hashJoin; + SMJoinOperatorInfo* pJoin; } SMJoinMergeCtx; typedef struct SMJoinWinCtx { @@ -161,17 +159,17 @@ typedef struct SMJoinOperatorInfo { SOperatorInfo* pOperator; int32_t joinType; int32_t subType; - int32_t inputTsOrder; + int32_t inputTsOrder; + int32_t errCode; SMJoinTableInfo tbs[2]; SMJoinTableInfo* build; SMJoinTableInfo* probe; - SSDataBlock* pResBlk; int32_t pResColNum; int8_t* pResColMap; SFilterInfo* pFPreFilter; SFilterInfo* pPreFilter; SFilterInfo* pFinFilter; - SMJoinFuncs* joinFps; +// SMJoinFuncs* joinFps; SMJoinCtx ctx; SMJoinExecInfo execInfo; } SMJoinOperatorInfo; @@ -180,7 +178,7 @@ typedef struct SMJoinOperatorInfo { #define MJOIN_DS_NEED_INIT(_pOp, _tbctx) (MJOIN_DS_REQ_INIT(_pOp) && (!(_tbctx)->dsInitDone)) #define MJOIN_TB_LOW_BLK(_tbctx) ((_tbctx)->blkNum <= 0 || ((_tbctx)->blkNum == 1 && (_tbctx)->pHeadBlk->cloned)) -#define REACH_HJOIN_THRESHOLD(_prb, _bld) ((_prb)->grpRowsNum * (_bld)->grpRowsNum > MJOIN_HJOIN_CART_THRESHOLD) +#define REACH_HJOIN_THRESHOLD(_prb, _bld) ((_prb)->grpTotalRows * (_bld)->grpTotalRows >= MJOIN_HJOIN_CART_THRESHOLD) #define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair)) @@ -190,15 +188,42 @@ typedef struct SMJoinOperatorInfo { #define GRP_REMAIN_ROWS(_grp) ((_grp)->endIdx - (_grp)->readIdx + 1) #define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx) +#define MJOIN_TB_ROWS_DONE(_tb) ((_tb)->blkRowIdx >= (_tb)->blk->info.rows) + #define BLK_IS_FULL(_blk) ((_blk)->info.rows == (_blk)->info.capacity) -#define SET_TABLE_CUR_TS(_col, _ts, _tb) \ +#define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \ do { \ (_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \ (_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \ } while (0) +#define MJOIN_GET_TB_CUR_TS(_col, _ts, _tb) \ + do { \ + (_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \ + } while (0) + + +#define MJ_ERR_RET(c) \ + do { \ + int32_t _code = (c); \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + return _code; \ + } \ + } while (0) + +#define MJ_ERR_JRET(c) \ + do { \ + code = (c); \ + if (code != TSDB_CODE_SUCCESS) { \ + terrno = code; \ + goto _return; \ + } \ + } while (0) + + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index f96290dbb7..221e923214 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -179,6 +179,16 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi memcpy(&pTable->inputStat, pStat, sizeof(*pStat)); + pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows)); + if (E_JOIN_TB_BUILD == pTable->type) { + pTable->createdBlks = taosArrayInit(8, POINTER_BYTES); + pTable->pGrpArrays = taosArrayInit(32, POINTER_BYTES); + pTable->pGrpHash = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (NULL == pTable->createdBlks || NULL == pTable->pGrpArrays || NULL == pTable->pGrpHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + return TSDB_CODE_SUCCESS; } @@ -244,64 +254,36 @@ static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhys return TSDB_CODE_SUCCESS; } - -static FORCE_INLINE int32_t mJoinAddPageToBufList(SArray* pRowBufs) { - SBufPageInfo page; - page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE; - page.offset = 0; - page.data = taosMemoryMalloc(page.pageSize); - if (NULL == page.data) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - taosArrayPush(pRowBufs, &page); - return TSDB_CODE_SUCCESS; -} - -static int32_t mJoinInitBufPages(SMJoinOperatorInfo* pInfo) { - pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo)); - if (NULL == pInfo->pRowBufs) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - return mJoinAddPageToBufList(pInfo->pRowBufs); -} - -static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { +static int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; pCtx->lastEqTs = INT64_MIN; pCtx->hashCan = pJoin->probe->keyNum > 0; - pCtx->probeEqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows)); - pCtx->probeCreatedBlks = taosArrayInit(8, POINTER_BYTES); - - pCtx->buildEqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows)); - pCtx->buildCreatedBlks = taosArrayInit(8, POINTER_BYTES); + pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc.totalRowSize)); if (pJoin->pFPreFilter) { - pCtx->midBlk = createOneDataBlock(pJoin->pResBlk, false); - blockDataEnsureCapacity(pCtx->midBlk, pJoin->pResBlk->info.rows); + pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false); + blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity); } - pCtx->finBlk = pJoin->pResBlk; - - pCtx->blksCapacity = pJoin->pResBlk->info.rows * 2; + pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.5; - pCtx->resBlk = NULL; - return TSDB_CODE_SUCCESS; } -static int32_t mJoinInitCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { +static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { +#if 0 pJoin->joinFps = &gMJoinFps[pJoin->joinType][pJoin->subType]; int32_t code = (*pJoin->joinFps->initJoinCtx)(pOperator, pJoin); if (code) { return code; } - - return TSDB_CODE_SUCCESS; +#else + return mJoinInitMergeCtx(pJoin, pJoinNode); +#endif } static void mJoinSetDone(SOperatorInfo* pOperator) { @@ -314,67 +296,17 @@ static void mJoinSetDone(SOperatorInfo* pOperator) { } } -static int32_t mJoinAddBlkToList(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pCtx, SSDataBlock* pBlock) { - SMJoinBlkInfo* pNew = taosMemoryCalloc(1, sizeof(SMJoinBlkInfo)); - if (NULL == pNew) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pNew->pBlk = pBlock; - pNew->inUse = true; - - if (NULL == pCtx->pTailBlk) { - pCtx->pTailBlk = pCtx->pHeadBlk = pNew; - pCtx->pCurrBlk = pCtx->pHeadBlk; - pCtx->blkIdx = 0; - pCtx->blkRowIdx = 0; - pCtx->blkNum = 1; - if (E_JOIN_TB_PROBE == pCtx->type) { - SColumnInfoData* probeCol = taosArrayGet(pCtx->pCurrBlk->pBlk, pCtx->pTbInfo->primCol->srcSlot); - pCtx->blkCurTs = *(int64_t*)probeCol->pData; - } - } else { - pCtx->pTailBlk->pNext = pNew; - pCtx->blkNum++; - if (E_JOIN_TB_PROBE == pCtx->type) { - SMJoinTsJoinCtx* pTsCtx = &pJoin->ctx.mergeCtx.tsJoinCtx; - pCtx->blkCurTs = pTsCtx->probeTs[pCtx->blkRowIdx]; - } - } - - return TSDB_CODE_SUCCESS; -} - -static FORCE_INLINE void mLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) { - pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows; - pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows; - SColumnInfoData* probeCol = taosArrayGet(pProbeCtx->pCurrBlk->pBlk, pProbeCtx->pTbInfo->primCol->srcSlot); - SColumnInfoData* buildCol = taosArrayGet(pBuildCtx->pHeadBlk->pBlk, pBuildCtx->pTbInfo->primCol->srcSlot); - pCtx->probeTs = (int64_t*)probeCol->pData; - pCtx->probeEndTs = (int64_t*)probeCol->pData + pCtx->probeRowNum - 1; - pCtx->buildTs = (int64_t*)buildCol->pData; - pCtx->buildEndTs = (int64_t*)buildCol->pData + pCtx->buildRowNum - 1; -} - -static FORCE_INLINE void mJoinResetTsJoinCtx(SMJoinTsJoinCtx* pCtx) { - pCtx->inSameTsGrp = false; - pCtx->inDiffTsGrp = false; - pCtx->nextProbeRow = false; - pCtx->pLastGrpPair = NULL; -} - - -static void mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst) { +static int32_t mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp) { SMJoinTableInfo* probe = pJoin->probe; SMJoinTableInfo* build = pJoin->build; int32_t currRows = append ? pRes->info.rows : 0; - int32_t firstRows = GRP_REMAIN_ROWS(pFirst); + int32_t firstRows = GRP_REMAIN_ROWS(pGrp); for (int32_t c = 0; c < probe->finNum; ++c) { SMJoinColMap* pFirstCol = probe->finCols + c; - SColumnInfoData* pInCol = taosArrayGet(pFirst->blk, pFirstCol->srcSlot); + SColumnInfoData* pInCol = taosArrayGet(pGrp->blk, pFirstCol->srcSlot); SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); - colDataAssignNRows(pOutCol, currRows, pInCol, pFirst->readIdx, firstRows); + colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows); } for (int32_t c = 0; c < build->finNum; ++c) { @@ -384,9 +316,10 @@ static void mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, } pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows; + return TSDB_CODE_SUCCESS; } -static void mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) { +static int32_t mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) { SMJoinTableInfo* probe = pJoin->probe; SMJoinTableInfo* build = pJoin->build; int32_t currRows = append ? pRes->info.rows : 0; @@ -415,47 +348,44 @@ static void mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, boo } } - pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows; + pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows; + return TSDB_CODE_SUCCESS; } -static void mLeftJoinMergeFullCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { +static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) { int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; - SMJoinTableInfo* probe = pJoin->probe; - SMJoinTableInfo* build = pJoin->build; - SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + SMJoinTableInfo* probe = pCtx->pJoin->probe; + SMJoinTableInfo* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); + int32_t probeEndIdx = probeGrp->endIdx; - pCtx->eqCart = true; - - if (probeRows * build->grpRemainRows <= rowsLeft) { + if (probeRows * build->grpTotalRows <= rowsLeft) { for (; build->grpIdx < buildGrpNum; ++build->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); - mLeftJoinGrpEqCart(pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); } - probe->grpIdx++; - build->grpRemainRows = 0; + pCtx->grpRemains = false; - return true; + return TSDB_CODE_SUCCESS; } - for (; probeGrp->readIdx <= probeGrp->endIdx; ++probeGrp->readIdx) { + for (; !GRP_DONE(probeGrp); ++probeGrp->readIdx, build->grpIdx = 0) { + probeGrp->endIdx = probeGrp->readIdx; for (; build->grpIdx < buildGrpNum; ++build->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); - int32_t probeEndIdx = probeGrp->endIdx; - probeGrp->endIdx = probeGrp->readIdx; if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { - mLeftJoinGrpEqCart(pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); rowsLeft -= GRP_REMAIN_ROWS(buildGrp); - probeGrp->endIdx = probeEndIdx; continue; } int32_t buildEndIdx = buildGrp->endIdx; buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; - mLeftJoinGrpEqCart(pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp); buildGrp->readIdx += rowsLeft; buildGrp->endIdx = buildEndIdx; rowsLeft = 0; @@ -467,131 +397,166 @@ static void mLeftJoinMergeFullCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pC } } + probeGrp->endIdx = probeEndIdx; + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; } -static void mLeftJoinCopyMergeMidBlk(SSDataBlock* pMid, SSDataBlock* pFin) { +static int32_t mLeftJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) { SSDataBlock* pLess = NULL; SSDataBlock* pMore = NULL; - if (pMid->info.rows < pFin->info.rows) { - pLess = pMid; - pMore = pFin; + if ((*ppMid)->info.rows < ppFin->info.rows) { + pLess = (*ppMid); + pMore = (*ppFin); } else { - pLess = pFin; - pMore = pMid; + pLess = (*ppFin); + pMore = (*ppMid); } - int32_t totalRows = pMid->info.rows + pFin->info.rows; + int32_t totalRows = pMore->info.rows + pLess->info.rows; if (totalRows <= pMore->info.capacity) { - blockDataMerge(pMore, pLess); + MJ_ERR_RET(blockDataMerge(pMore, pLess)); + tDataBlkReset(pLess); + pCtx->midRemains = false; } else { - + int32_t copyRows = pMore->info.capacity - pMore->info.rows; + MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows)); + pCtx->midRemains = true; } + + if (pMore != (*ppFin)) { + TSWAP(*ppMid, *ppFin); + } + + return TSDB_CODE_SUCCESS; } -static void mLeftJoinMergeSeqCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { - SMJoinTableInfo* probe = pJoin->probe; - SMJoinTableInfo* build = pJoin->build; +static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { + SMJoinTableInfo* probe = pCtx->pJoin->probe; + SMJoinTableInfo* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); int32_t probeEndIdx = probeGrp->endIdx; int32_t rowsLeft = pCtx->midBlk->info.capacity; bool contLoop = true; - pCtx->eqCart = true; + blockDataReset(pCtx->midBlk); do { - for (; !GRP_DONE(probeGrp->readIdx) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) { + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx, probeGrp->readMatch = false, build->grpIdx = 0) { probeGrp->endIdx = probeGrp->readIdx; for (; build->grpIdx < buildGrpNum; ++build->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { - mLeftJoinGrpEqCart(pJoin, pCtx->midBlk, true, probeGrp, buildGrp); + MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); rowsLeft -= GRP_REMAIN_ROWS(buildGrp); continue; } int32_t buildEndIdx = buildGrp->endIdx; buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; - mLeftJoinGrpEqCart(pJoin, pCtx->midBlk, true, probeGrp, buildGrp); + MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); buildGrp->readIdx += rowsLeft; buildGrp->endIdx = buildEndIdx; rowsLeft = 0; break; } - doFilter(pCtx->midBlk, pJoin->pFPreFilter, NULL); if (pCtx->midBlk->info.rows > 0) { - probeGrp->readMatch = true; - } else if (build->grpIdx == buildGrpNum && !probeGrp->readMatch) { - mLeftJoinGrpNonEqCart(pJoin, pCtx->finBlk, true, probeGrp); - continue; - } + MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pFPreFilter, NULL)); + if (pCtx->midBlk->info.rows > 0) { + probeGrp->readMatch = true; + } + } + + if (0 == pCtx->midBlk->info.rows) { + if (build->grpIdx == buildGrpNum) { + if (!probeGrp->readMatch) { + MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); + } + + continue; + } + + break; + } else { + MJ_ERR_RET(mLeftJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk)); + + if (pCtx->midRemains) { + contLoop = false; + break; + } + + if (build->grpIdx == buildGrpNum) { + continue; + } - if (pCtx->midBlk->info.rows >= pJoin->pOperator->resultInfo.threshold) { - contLoop = false; break; } - - rowsLeft = pCtx->midBlk->info.capacity - pCtx->midBlk->info.rows; - - mLeftJoinCopyMergeMidBlk(&pCtx->midBlk, &pCtx->finBlk); - break; } if (GRP_DONE(probeGrp->readIdx) || BLK_IS_FULL(pCtx->finBlk)) { break; } + + rowsLeft = pCtx->midBlk->info.capacity; } while (contLoop); probeGrp->endIdx = probeEndIdx; pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; } -static void mLeftJoinMergeCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { - if (NULL == pJoin->pFPreFilter) { - mLeftJoinMergeFullCart(pJoin, pCtx); - } else { - mLeftJoinMergeSeqCart(pJoin, pCtx); - } +static int32_t mLeftJoinMergeCart(SMJoinMergeCtx* pCtx) { + return (NULL == pCtx->pJoin->pFPreFilter) ? mLeftJoinMergeFullCart(pCtx) : mLeftJoinMergeSeqCart(pCtx); } -static void mLeftJoinNonEqCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { +static int32_t mLeftJoinNonEqCart(SMJoinMergeCtx* pCtx) { int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; SMJoinGrpRows* probeGrp = &pCtx->probeNEqGrp; int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); - pCtx->eqCart = false; + pCtx->lastEqGrp = false; if (probeRows <= rowsLeft) { - mLeftJoinGrpNonEqCart(pJoin, pCtx->finBlk, true, probeGrp, NULL); + MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); probeGrp->readIdx = probeGrp->endIdx + 1; pCtx->grpRemains = false; } else { int32_t probeEndIdx = probeGrp->endIdx; probeGrp->endIdx = probeGrp->readIdx + rowsLeft - 1; - mLeftJoinGrpNonEqCart(pJoin, pCtx->finBlk, true, probeGrp, NULL); + MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); probeGrp->readIdx = probeGrp->endIdx + 1; probeGrp->endIdx = probeEndIdx; pCtx->grpRemains = true; } + + return TSDB_CODE_SUCCESS; } static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableInfo* ppTb) { - if (!(*ppTb)->dsFetchDone && (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows)) { + if ((*ppTb)->dsFetchDone) { + return (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) ? false : true; + } + + if (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) { (*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, (*ppTb)->downStreamIdx); (*ppTb)->dsInitDone = true; - qDebug("merge join %s table got %" PRId64 " rows block", MJOIN_TBTYPE(ppTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0); + qDebug("%s merge join %s table got %" PRId64 " rows block", GET_TASKID(pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(ppTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0); *pIdx = 0; if (NULL == (*ppBlk)) { (*ppTb)->dsFetchDone = true; } + return ((*ppBlk) == NULL) ? false : true; } @@ -608,10 +573,12 @@ static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, &pJoin->build); } - if (NULL == pJoin->probe->blk) { + if (!probeGot) { mJoinSetDone(pOperator); return false; - } else if (buildGot && probeGot) { + } + + if (buildGot) { SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { @@ -662,12 +629,13 @@ static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotI } -static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp, bool* allBlk, bool restart) { +static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp, bool* wholeBlk, bool restart) { SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); SMJoinGrpRows* pGrp = NULL; - int32_t if (restart) { + pTable->grpTotalRows = 0; + pTable->grpIdx = 0; pGrp = taosArrayGet(pTable->eqGrps, 0); } else { pGrp = taosArrayReserve(pTable->eqGrps, 1); @@ -686,36 +654,35 @@ static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable continue; } - pTable->grpRowsNum += pGrp->endIdx - pGrp->beginIdx + 1; - pTable->grpRemainRows = pTable->grpRowsNum; + pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1; return; } - if (allBlk) { - *allBlk = true; + if (wholeBlk) { + *wholeBlk = true; if (0 == pGrp->beginIdx) { pGrp->blk = createOneDataBlock(pTable->blk, true); } else { pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, pGrp->endIdx - pGrp->beginIdx + 1); + pGrp->endIdx -= pGrp->beginIdx; + pGrp->beginIdx = 0; + pGrp->readIdx = 0; } taosArrayPush(pTable->createdBlks, &pGrp->blk); - pGrp->beginIdx = 0; } - pTable->grpRowsNum += pGrp->endIdx - pGrp->beginIdx + 1; - pTable->grpRemainRows = pTable->grpRowsNum; + pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1; } -static int32_t mJoinRetrieveSameTsRows(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp) { - SMJoinOperatorInfo* pJoin = pOperator->info; - bool allBlk = false; +static int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp) { + bool wholeBlk = false; - mJoinBuildEqGroups(pOperator, pTable, timestamp, &allBlk, true); + mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true); - while (allBlk) { + while (wholeBlk) { pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx); - qDebug("merge join %s table got block for same ts, rows:%" PRId64, MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); + qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); pTable->blkRowIdx = 0; @@ -724,11 +691,11 @@ static int32_t mJoinRetrieveSameTsRows(SOperatorInfo* pOperator, SMJoinTableInfo break; } - allBlk = false; - mJoinBuildEqGroups(pOperator, pTable, timestamp, &allBlk, false); + wholeBlk = false; + mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, false); } - return 0; + return TSDB_CODE_SUCCESS; } static int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableInfo* pTable) { @@ -797,6 +764,7 @@ static int32_t mJoinGetAvailableGrpArray(SMJoinTableInfo* pTable, SArray** ppRes do { if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) { *ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++); + taosArrayClear(*ppRes); return TSDB_CODE_SUCCESS; } @@ -816,10 +784,8 @@ static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDat SArray** pGrpRows = tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen); if (!pGrpRows) { SArray* pNewGrp = NULL; - int32_t code = mJoinGetAvailableGrpArray(pBuild, &pNewGrp); - if (code) { - return code; - } + MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp)); + taosArrayPush(pNewGrp, &pos); tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES); } else { @@ -830,50 +796,59 @@ static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDat } -static void mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableInfo* pTable) { +static int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableInfo* pTable) { + size_t bufLen = 0; + + tSimpleHashClear(pJoin->build->pGrpHash); + pJoin->build->grpArrayIdx = 0; + int32_t grpNum = taosArrayGetSize(pTable->eqGrps); for (int32_t g = 0; g < grpNum; ++g) { SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g); - int32_t code = mJoinSetKeyColsData(pGrp->blk, pTable); - if (code) { - return code; - } + MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable)); int32_t grpRows = GRP_REMAIN_ROWS(pGrp); - size_t bufLen = 0; for (int32_t r = 0; r < grpRows; ++r) { if (mJoinCopyKeyColsDataToBuf(pTable, r, &bufLen)) { continue; } - code = mJoinAddRowToHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r); - if (code) { - return code; - } + + MJ_ERR_RET(mJoinAddRowToHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r)); } } + + return TSDB_CODE_SUCCESS; } -static bool mLeftJoinHashRowCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp, SMJoinTableInfo* probe, SMJoinTableInfo* build) { - int32_t rowsLeft = pCtx->resBlk->info.capacity - pCtx->resBlk->info.rows; +static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableInfo* probe, SMJoinTableInfo* build) { + int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity; + if (rowsLeft <= 0) { + return false; + } + int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp); int32_t grpRows = buildGrpRows - build->grpRowIdx; + if (grpRows <= 0) { + return true; + } + int32_t actRows = TMIN(grpRows, rowsLeft); - int32_t currRows = pCtx->noColCond ? pCtx->resBlk->info.rows : 0; + int32_t currRows = append ? pBlk->info.rows : 0; for (int32_t c = 0; c < probe->finNum; ++c) { SMJoinColMap* pFirstCol = probe->finCols + c; SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk, pFirstCol->srcSlot); - SColumnInfoData* pOutCol = taosArrayGet(pCtx->resBlk->pDataBlock, pFirstCol->dstSlot); + SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot); if (colDataIsNull_s(pInCol, probeGrp->readIdx)) { colDataSetNItemsNull(pOutCol, currRows, actRows); } else { - colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->beginIdx), actRows, true); + colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, true); } } for (int32_t c = 0; c < build->finNum; ++c) { SMJoinColMap* pSecondCol = build->finCols + c; - SColumnInfoData* pOutCol = taosArrayGet(pCtx->resBlk->pDataBlock, pSecondCol->dstSlot); + SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot); for (int32_t r = 0; r < actRows; ++r) { SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, r); SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk, pSecondCol->srcSlot); @@ -881,7 +856,8 @@ static bool mLeftJoinHashRowCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp, } } - pCtx->resBlk->info.rows += actRows; + pBlk->info.rows += actRows; + if (actRows == grpRows) { build->grpRowIdx = -1; } else { @@ -895,109 +871,221 @@ static bool mLeftJoinHashRowCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp, return true; } -static void mLeftJoinHashCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { - SMJoinTableInfo* probe = pJoin->probe; - SMJoinTableInfo* build = pJoin->build; +static int32_t mLeftJoinHashFullCart(SMJoinMergeCtx* pCtx) { + SMJoinTableInfo* probe = pCtx->pJoin->probe; + SMJoinTableInfo* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); - if (pJoin->build->grpRowIdx >= 0) { - bool contLoop = mLeftJoinHashRowCart(pCtx, probeGrp, probe, build); + if (build->grpRowIdx >= 0) { + bool contLoop = mLeftJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); if (build->grpRowIdx < 0) { probeGrp->readIdx++; } if (!contLoop) { - pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; - return; + goto _return; } } - pCtx->eqCart = true; - size_t bufLen = 0; - for (; probeGrp->readIdx < probeGrp->endIdx; ++probeGrp->readIdx) { + int32_t probeEndIdx = probeGrp->endIdx; + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) { if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { continue; } SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); if (NULL == pGrp) { - + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); + probeGrp->endIdx = probeEndIdx; } else { build->pHashCurGrp = *pGrp; build->grpRowIdx = 0; - if (NULL == pJoin->pPreFilter) { - if (!mLeftJoinHashRowCart(pCtx, probeGrp, probe, build)) { - break; - } - } else { - + if (!mLeftJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build)) { + break; } } } +_return: + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; } -static int32_t mLeftJoinProcessEqualGrp(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes) { - SMJoinOperatorInfo* pJoin = pOperator->info; - SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; +static int32_t mLeftJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) { + SMJoinTableInfo* probe = pCtx->pJoin->probe; + SMJoinTableInfo* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); - mJoinBuildEqGroups(pOperator, pJoin->probe, timestamp, NULL, true); - mJoinRetrieveSameTsRows(pOperator, pJoin->build, timestamp); - if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) { - int32_t code = mJoinMakeBuildTbHash(pJoin, pJoin->build); - if (code) { - return code; - } - code = mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe); - if (code) { - return code; + blockDataReset(pCtx->midBlk); + + do { + mLeftJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); + if (build->grpRowIdx < 0) { + probeGrp->readIdx++; } - pCtx->hashJoin = true; - mLeftJoinHashCart(pJoin, pCtx); - } else { - mLeftJoinMergeCart(pJoin, pCtx); - } - + if (pCtx->midBlk->info.rows > 0) { + MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pPreFilter, NULL)); + if (pCtx->midBlk->info.rows > 0) { + probeGrp->readMatch = true; + } + } + + if (0 == pCtx->midBlk->info.rows) { + if (build->grpRowIdx < 0) { + if (!probeGrp->readMatch) { + MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); + } + + probeGrp->readMatch = false; + break; + } + + continue; + } else { + MJ_ERR_RET(mLeftJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk)); + + if (pCtx->midRemains) { + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + *contLoop = false; + return TSDB_CODE_SUCCESS; + } + + if (build->grpRowIdx < 0) { + probeGrp->readMatch = false; + break; + } + + continue; + } + } while (true); + + *contLoop = true; return TSDB_CODE_SUCCESS; } -static bool mLeftJoinHandleRowRemains(struct SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx, SSDataBlock* pRes) { - if (pCtx->eqCart) { - if (pCtx->hashJoin) { - mLeftJoinHashCart(pJoin, pCtx); - } else { - mLeftJoinMergeCart(pJoin, pCtx); +static int32_t mLeftJoinHashSeqCart(SMJoinMergeCtx* pCtx) { + SMJoinTableInfo* probe = pCtx->pJoin->probe; + SMJoinTableInfo* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); + bool contLoop = false; + + if (build->grpRowIdx >= 0) { + MJ_ERR_RET(mLeftJoinHashGrpCartFilter(pCtx, &contLoop)); + if (!contLoop) { + goto _return; } - if (pRes->info.rows >= pOperator->resultInfo.threshold) { - return false; + } + + size_t bufLen = 0; + int32_t probeEndIdx = probeGrp->endIdx; + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk);) { + if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { + continue; } - return true; + SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); + if (NULL == pGrp) { + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); + probeGrp->endIdx = probeEndIdx; + probeGrp->readIdx++; + probeGrp->readMatch = false; + } else { + build->pHashCurGrp = *pGrp; + build->grpRowIdx = 0; + + MJ_ERR_RET(mLeftJoinHashGrpCartFilter(pCtx, &contLoop)); + if (!contLoop) { + break; + } + } } - - mLeftJoinNonEqCart(pJoin, pCtx); - if (pRes->info.rows >= pOperator->resultInfo.threshold) { - return false; - } - - return true; + +_return: + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; } -static void mLeftJoinDo(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { + +static int32_t mLeftJoinHashCart(SMJoinMergeCtx* pCtx) { + return (NULL == pCtx->pJoin->pPreFilter) ? mLeftJoinHashFullCart(pCtx) : mLeftJoinHashSeqCart(pCtx); +} + +static int32_t mLeftJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { + SMJoinOperatorInfo* pJoin = pCtx->pJoin; + + pCtx->lastEqGrp = true; + + mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true); + if (!lastBuildGrp) { + mJoinRetrieveEqGrpRows(pJoin->pOperator, pJoin->build, timestamp); + } else { + pJoin->build->grpIdx = 0; + } + + if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) { + if (!lastBuildGrp || NULL == pJoin->build->pGrpHash) { + MJ_ERR_RET(mJoinMakeBuildTbHash(pJoin, pJoin->build)); + MJ_ERR_RET(mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe)); + } + + pCtx->hashJoin = true; + + return mLeftJoinHashCart(pJoin, pCtx); + } + + return mLeftJoinMergeCart(pJoin, pCtx); +} + +static bool mLeftJoinHandleMidRemains(SMJoinMergeCtx* pCtx) { + ASSERT(0 < pCtx->midBlk.info.rows); + + TSWAP(pCtx->midBlk, pCtx->finBlk); + + return (pCtx->finBlk->info.rows >= pCtx->blkThreshold) ? false : true; +} + + +static int32_t mLeftJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { + if (pCtx->lastEqGrp) { + return (pCtx->hashJoin) ? mLeftJoinHashCart(pCtx) : mLeftJoinMergeCart(pCtx); + } + + return mLeftJoinNonEqCart(pCtx); +} + +static SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + int32_t code = TSDB_CODE_SUCCESS; int64_t probeTs = 0; int64_t buildTs = 0; SColumnInfoData* pBuildCol = NULL; SColumnInfoData* pProbeCol = NULL; bool asc = (pJoin->inputTsOrder == TSDB_ORDER_ASC) ? true : false; - if (pCtx->grpRemains && !mLeftJoinHandleRowRemains(pOperator, pJoin, pCtx, pRes)) { - return; + blockDataReset(pCtx->finBlk); + + if (pCtx->midRemains) { + MJ_ERR_JRET(mLeftJoinHandleMidRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + } + + if (pCtx->grpRemains) { + MJ_ERR_JRET(mLeftJoinHandleGrpRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } } do { @@ -1005,53 +1093,72 @@ static void mLeftJoinDo(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { break; } - SET_TABLE_CUR_TS(pBuildCol, buildTs, pJoin->build); - SET_TABLE_CUR_TS(pProbeCol, probeTs, pJoin->probe); + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); if (probeTs == pCtx->lastEqTs) { - mLeftJoinProcessEqualGrp(pOperator, probeTs, pRes); - if (pRes->info.rows >= pOperator->resultInfo.threshold) { - return; + MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, true)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; } - if (pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { - SET_TABLE_CUR_TS(pProbeCol, probeTs, pJoin->probe); - } else { + if (MJOIN_TB_ROWS_DONE(pJoin->probe)) { continue; + } else { + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); } } - while (pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows && pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { + while (!MJOIN_TB_ROWS_DONE(pJoin->probe) && !MJOIN_TB_ROWS_DONE(pJoin->build)) { if (probeTs == buildTs) { pCtx->lastEqTs = probeTs; - mLeftJoinProcessEqualGrp(pOperator, probeTs, pRes); - if (pRes->info.rows >= pOperator->resultInfo.threshold) { - return; + MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, false)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; } - SET_TABLE_CUR_TS(pBuildCol, buildTs, pJoin->build); - SET_TABLE_CUR_TS(pProbeCol, probeTs, pJoin->probe); + MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); } else if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) { pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx; + pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; + pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx; - do { - pCtx->probeNEqGrp.endIdx = pJoin->probe->blkRowIdx; - probeTs = *((int64_t*)pProbeCol->pData + (++pJoin->probe->blkRowIdx)); - } while (pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows && LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)); + while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); + if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) { + pCtx->probeNEqGrp.endIdx = pJoin->probe->blkRowIdx; + continue; + } + + break; + } - mLeftJoinNonEqCart(pJoin, pCtx); - - if (pRes->info.rows >= pOperator->resultInfo.threshold) { - return; + MJ_ERR_JRET(mLeftJoinNonEqCart(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; } } else { - buildTs = *((int64_t*)pBuildCol->pData + (++pJoin->build->blkRowIdx)); - while (pJoin->build->blkRowIdx < pJoin->build->blk->info.rows && LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) { - buildTs = *((int64_t*)pBuildCol->pData + (++pJoin->build->blkRowIdx)); + while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); + if (LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) { + continue; + } + + break; } } } } while (true); + +_return: + + if (code) { + pJoin->errCode = code; + return NULL; + } + + return pCtx->finBlk; } @@ -1074,16 +1181,19 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { } SSDataBlock* pBlock = NULL; - //blockDataCleanup(pJoin->pRes); - while (true) { - pBlock = (*pJoin->joinFp)(pOperator); + //pBlock = (*pJoin->joinFps)(pOperator); + pBlock = mLeftJoinDo(pOperator); if (NULL == pBlock) { + if (pJoin->errCode) { + T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode); + } break; } if (pJoin->pFinFilter != NULL) { doFilter(pBlock, pJoin->pFinFilter, NULL); } + if (pBlock->info.rows > 0 || pOperator->status == OP_EXEC_DONE) { break; } @@ -1093,14 +1203,7 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - if (pBlock->info.rows > 0) { - pJoin->resRows += pBlock->info.rows; - qDebug("%s merge join returns res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.rows); - return pBlock; - } else { - qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoin->resRows); - return NULL; - } + return (pBlock && pBlock->info.rows > 0) ? pBlock : NULL; } @@ -1122,18 +1225,15 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } - int32_t numOfCols = 0; - pInfo->pResBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - blockDataEnsureCapacity(pInfo->pResBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc.totalRowSize)); - setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + mJoinSetBuildAndProbeTable(pInfo, pJoinNode); + mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); - mJoinSetBuildAndProbeTable(pInfo, pJoinNode); - mJoinInitCtx(pOperator, pInfo); + MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode)); code = mJoinBuildResColMap(pInfo, pJoinNode); if (code) {