From 97567f74cbcbefb1b5aa980b3110471df27a67b5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 18 Jan 2024 16:50:47 +0800 Subject: [PATCH] enh: support asof join --- source/libs/executor/inc/mergejoin.h | 122 ++- source/libs/executor/src/mergejoin.c | 785 +++++++++++++++++-- source/libs/executor/src/mergejoinoperator.c | 61 +- 3 files changed, 884 insertions(+), 84 deletions(-) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 6107ea2c40..0e04bf7c0c 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -117,6 +117,8 @@ typedef struct SMJoinTableCtx { int32_t grpIdx; bool noKeepEqGrpRows; bool multiEqGrpRows; + int64_t eqRowLimit; + int64_t eqRowNum; SArray* eqGrps; SArray* createdBlks; @@ -150,20 +152,37 @@ typedef struct SMJoinGrpRows { bool readMatch; } SMJoinGrpRows; +#define MJOIN_COMMON_CTX \ + struct SMJoinOperatorInfo* pJoin; \ + bool ascTs; \ + bool grpRemains; \ + SSDataBlock* finBlk; \ + bool lastEqGrp; \ + bool lastProbeGrp; \ + int32_t blkThreshold; \ + int64_t jLimit + +typedef struct SMJoinCommonCtx { + MJOIN_COMMON_CTX; +} SMJoinCommonCtx; typedef struct SMJoinMergeCtx { + // KEEP IT FIRST struct SMJoinOperatorInfo* pJoin; - bool ascTs; - bool hashCan; - bool keepOrder; - bool grpRemains; - bool midRemains; - bool nmatchRemains; - bool lastEqGrp; + bool ascTs; + bool grpRemains; + SSDataBlock* finBlk; + bool lastEqGrp; bool lastProbeGrp; int32_t blkThreshold; + int64_t jLimit; + // KEEP IT FIRST + + bool hashCan; + bool keepOrder; + bool midRemains; + bool nmatchRemains; SSDataBlock* midBlk; - SSDataBlock* finBlk; int64_t lastEqTs; SMJoinGrpRows probeNEqGrp; SMJoinGrpRows buildNEqGrp; @@ -172,31 +191,53 @@ typedef struct SMJoinMergeCtx { joinCartFp mergeCartFp; } SMJoinMergeCtx; +typedef enum { + E_CACHE_NONE = 0, + E_CACHE_OUTBLK, + E_CACHE_INBLK +} SMJoinCacheMode; + +typedef struct SAsofJoinGrpRows { + SSDataBlock* blk; + bool clonedBlk; + int32_t blkRowIdx; + int32_t readIdx; +} SAsofJoinGrpRows; + + typedef struct SMJoinWinCache { int32_t pageLimit; - - int64_t rowsNum; - int32_t rowOffset; - int32_t outBlkIdx; - int32_t outRowOffset; - + int32_t outRowIdx; int32_t colNum; - SSDataBlock* blk; + int32_t rowNum; + int8_t grpIdx; + SArray* grps; + SSDataBlock* outBlk; } SMJoinWinCache; typedef struct SMJoinWindowCtx { + // KEEP IT FIRST struct SMJoinOperatorInfo* pJoin; bool ascTs; + bool grpRemains; + SSDataBlock* finBlk; + bool lastEqGrp; + bool lastProbeGrp; + int32_t blkThreshold; + int64_t jLimit; + // KEEP IT FIRST int32_t asofOpType; bool asofLowerRow; bool asofEqRow; bool asofGreaterRow; int64_t jLimit; - int32_t blkThreshold; - SSDataBlock* finBlk; - bool grpRemains; + bool eqPostDone; + int64_t lastTs; + bool rowRemains; + SMJoinGrpRows probeGrp; + SMJoinGrpRows buildGrp; SMJoinWinCache cache; } SMJoinWindowCtx; @@ -252,8 +293,8 @@ typedef struct SMJoinOperatorInfo { #define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair)) -#define PROBE_TS_UNREACH(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts)) -#define PROBE_TS_OVER(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts)) +#define PROBE_TS_LOWER(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts)) +#define PROBE_TS_GREATER(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts)) #define GRP_REMAIN_ROWS(_grp) ((_grp)->endIdx - (_grp)->readIdx + 1) #define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx) @@ -268,13 +309,44 @@ typedef struct SMJoinOperatorInfo { #define MJOIN_SET_ROW_BITMAP(_b, _base, _idx) colDataClearNull_f((_b + _base), _idx) #define ASOF_EQ_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_EQUAL == (_op)) -#define ASOF_LOWER_ROW_INCLUDED(_op) (OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_LOWER_THAN == (_op)) -#define ASOF_GREATER_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_GREATER_THAN == (_op)) +#define ASOF_LOWER_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_GREATER_THAN == (_op)) +#define ASOF_GREATER_ROW_INCLUDED(_op) (OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_LOWER_THAN == (_op)) +#define MJOIN_PUSH_BLK_TO_CACHE(_cache, _blk) \ + do { \ + ASSERT(taosArrayGetSize(_cache)->grps <= 1); \ + SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayReserve(_cache)->grps, 1); \ + (_cache)->rowNum += (_blk)->info.rows; \ + pGrp->blk = (_blk); \ + pGrp->beginIdx = 0; \ + } while (0) + +#define MJOIN_RESTORE_TB_BLK(_cache, _tb) \ + do { \ + SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \ + if (NULL != pGrp) { \ + (_tb)->blk = pGrp->blk; \ + (_tb)->blkRowIdx = pGrp->beginIdx; \ + } else { \ + (_tb)->blk = NULL; \ + (_tb)->blkRowIdx = 0; \ + } \ + } while (0) + +#define MJOIN_POP_TB_BLK(_cache) \ + do { \ + SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \ + if (NULL != pGrp) { \ + if (pGrp->blk == (_cache)->outBlk) { \ + blockDataCleanup(pGrp->blk); \ + } \ + taosArrayPopFrontBatch((_cache)->grps, 1); \ + } \ + } while (0) #define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \ do { \ - if (NULL != (_tb)->blk && (_tb)->blkRowIdx < (_tb)->blk->info.rows) { \ + if (NULL != (_tb)->blk && (_tb)->blkRowIdx < (_tb)->blk->info.rows) { \ (_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \ (_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \ } else { \ @@ -331,8 +403,8 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx); int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx); int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset); -int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs); -int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs); +int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs); +int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs); int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo); int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo); diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 4b75e7e8e8..6f4748b875 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -480,15 +480,15 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); - } else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { - MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); + } else if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { + MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } } else { while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); - if (PROBE_TS_OVER(pCtx->ascTs, probeTs, buildTs)) { + if (PROBE_TS_GREATER(pCtx->ascTs, probeTs, buildTs)) { continue; } @@ -711,7 +711,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { continue; } - if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { + if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); continue; @@ -1074,10 +1074,10 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { continue; } - if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { - MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); + if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { + MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); } else { - MJ_ERR_JRET(mJoinProcessOverGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs)); + MJ_ERR_JRET(mJoinProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs)); } if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { @@ -1410,7 +1410,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) { continue; } - if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { + if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); continue; @@ -1671,15 +1671,15 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); - } else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { - MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); + } else if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { + MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } } else { while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); - if (PROBE_TS_OVER(pCtx->ascTs, probeTs, buildTs)) { + if (PROBE_TS_GREATER(pCtx->ascTs, probeTs, buildTs)) { continue; } @@ -1727,24 +1727,340 @@ static bool mAsofJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi mJoinSetDone(pOperator); return false; } + + break; + } while (true); - if (buildGot) { - SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); - SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); - if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { - pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; + pCtx->probeGrp.blk = pJoin->probe->blk; + pCtx->buildGrp.blk = pJoin->build->blk; + + return true; +} + +int32_t mAsofJoinCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) { + if (pCache->outBlk->info.rows <= 0) { + *evictRows = 0; + return TMIN(jLimit, newRows); + } + + if ((pCache->outBlk->info.rows + newRows) <= jLimit) { + *evictRows = 0; + return newRows; + } + + if (newRows >= jLimit) { + *evictRows = pCache->outBlk->info.rows; + return jLimit; + } + + *evictRows = pCache->outBlk->info.rows + newRows - jLimit; + return newRows; +} + +int32_t mAsofJoinAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool fromBegin) { + int32_t evictRows = 0; + SMJoinWinCache* pCache = &pCtx->cache; + int32_t rows = mAsofJoinCalcRowNum(pCache, pCtx->jLimit, pGrp->endIdx - pGrp->beginIdx + 1, &evictRows); + if (evictRows > 0) { + MJ_ERR_RET(blockDataTrimFirstRows(pCache->outBlk, evictRows)); + } + + int32_t startIdx = fromBegin ? pGrp->beginIdx : pGrp->endIdx - rows + 1; + return blockDataMergeNRows(pCache->outBlk, pGrp->blk, startIdx, rows); +} + +int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) { + SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); + + if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { + return TSDB_CODE_SUCCESS; + } + + pGrp->beginIdx = pTable->blkRowIdx; + pGrp->readIdx = pTable->blkRowIdx; + + char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1); + if (timestamp != *(int64_t*)pEndVal) { + for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { + char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx); + if (timestamp == *(int64_t*)pNextVal) { continue; } + + return TSDB_CODE_SUCCESS + } + } + + pGrp->endIdx = pTable->blk->info.rows - 1; + pTable->blkRowIdx = pTable->blk->info.rows; + + if (wholeBlk) { + *wholeBlk = true; + } + + return TSDB_CODE_SUCCESS; +} + + + +int32_t mAsofJoinProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { + pCtx->buildGrp.beginIdx = pJoin->build->blkRowIdx; + pCtx->buildGrp.readIdx = pCtx->buildGrp.beginIdx; + pCtx->buildGrp.endIdx = pCtx->buildGrp.beginIdx; + + while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build); + if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { + pCtx->buildGrp.endIdx = pJoin->build->blkRowIdx; + continue; + } + + break; + } + + pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; + pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; + pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx; + + return mAsofJoinHandleProbeGreater(pCtx); +} + +int32_t mAsofJoinAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) { + int64_t eqRowsNum = 0; + SMJoinGrpRows grp = {.blk = pTable->blk}; + + do { + SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); + + if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { + return TSDB_CODE_SUCCESS; + } + + grp.beginIdx = pTable->blkRowIdx; + + char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1); + if (timestamp != *(int64_t*)pEndVal) { + for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { + char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx); + if (timestamp == *(int64_t*)pNextVal) { + continue; + } + + break; + } + + grp.endIdx = pTable->blkRowIdx - 1; + } else { + grp.endIdx = pTable->blk->info.rows - 1; + pTable->blkRowIdx = pTable->blk->info.rows; + } + + if (eqRowsNum < pCtx->jLimit) { + MJ_ERR_RET(mAsofJoinAddRowsToCache(pCtx, &grp, false)); + } + + eqRowsNum += grp.endIdx - grp.beginIdx + 1; + + if (pTable->blkRowIdx == pTable->blk->info.rows) { + pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx); + qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); + + pTable->blkRowIdx = 0; + + if (NULL == pTable->blk) { + pTable->dsFetchDone = true; + break; + } + } else { + break; + } + } while (true); + + return TSDB_CODE_SUCCESS; +} + +int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { + if (pCtx->cache.outBlk->info.rows <= 0) { + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, probeGrp, true); + } + + int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = &pCtx->probeGrp; + SMJoinGrpRows buildGrp = {.blk = pCtx->cache.outBlk, .readIdx = pCtx->cache.outRowIdx, .endIdx = pCtx->cache.outBlk->info.rows - 1}; + int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); + int32_t probeEndIdx = probeGrp->endIdx; + int64_t totalResRows = (0 == pCtx->cache.outRowIdx) ? (probeRows * pCtx->cache.outBlk->info.rows) : + (pCtx->cache.outBlk->info.rows - pCtx->cache.outRowIdx + (probeRows - 1) * pCtx->cache.outBlk->info.rows); + + if (totalResRows <= rowsLeft) { + if (0 == pCtx->cache.outRowIdx) { + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp)); + + pCtx->grpRemains = false; + return TSDB_CODE_SUCCESS; + } + + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp)); + if (++probeGrp->readIdx <= probeEndIdx) { + probeGrp->endIdx = probeEndIdx; + buildGrp->readIdx = 0; + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp)); + } + + pCtx->grpRemains = false; + return TSDB_CODE_SUCCESS; + } + + for (; !GRP_DONE(probeGrp) && rowsLeft > 0; ) { + if (0 == pCtx->cache.outRowIdx) { + int32_t grpNum = rowsLeft / pCtx->cache.outBlk->info.rows; + if (grpNum > 0) { + probeGrp->endIdx = probeGrp->readIdx + grpNum - 1; + buildGrp.readIdx = 0; + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + rowsLeft -= grpNum * pCtx->cache.outBlk->info.rows; + pCtx->cache.outRowIdx = 0; + probeGrp->readIdx += grpNum; + continue; + } + } + + probeGrp->endIdx = probeGrp->readIdx; + buildGrp.readIdx = pCtx->cache.outRowIdx; + + int32_t grpRemainRows = pCtx->cache.outBlk->info.rows - pCtx->cache.outRowIdx; + if (rowsLeft >= grpRemainRows) { + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + rowsLeft -= grpRemainRows; + pCtx->cache.outRowIdx = 0; + continue; + } + + buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; + mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + pCtx->cache.outRowIdx += rowsLeft; + break; + } + + probeGrp->endIdx = probeEndIdx; + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; +} + +int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, bool lastBuildGrp) { + if (!pCtx->asofEqRow) { + MJ_ERR_RET(mAsofLowerDumpGrpCache(pCtx)); + if (pCtx->grpRemains) { + return TSDB_CODE_SUCCESS; + } + + if (!pCtx->eqPostDone && !lastBuildGrp) { + pCtx->eqPostDone = true; + return mAsofJoinAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs); + } + + return TSDB_CODE_SUCCESS; + } + + if (!pCtx->eqPostDone && !lastBuildGrp) { + pCtx->eqPostDone = true; + MJ_ERR_RET(mAsofJoinAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs)); + } + + return mAsofLowerDumpGrpCache(pCtx); +} + +int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { + SMJoinOperatorInfo* pJoin = pCtx->pJoin; + + pCtx->lastEqGrp = true; + pCtx->eqPostDone = false; + + MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &pCtx->probeGrp)); + + return mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp); +} + + +int32_t mAsofLowerProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { + pCtx->lastEqGrp = false; + + pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; + pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; + pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx; + + while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pJoin->probe); + if (PROBE_TS_LOWER(pCtx->ascTs, *probeTs, *buildTs)) { + pCtx->probeGrp.endIdx = pJoin->probe->blkRowIdx; + continue; + } + + break; + } + + return mAsofLowerDumpGrpCache(pCtx); +} + +int32_t mAsofLowerProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { + pCtx->lastEqGrp = false; + + pCtx->buildGrp.beginIdx = pJoin->build->blkRowIdx; + pCtx->buildGrp.readIdx = pCtx->buildGrp.beginIdx; + pCtx->buildGrp.endIdx = pCtx->buildGrp.beginIdx; + + while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build); + if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { + pCtx->buildGrp.endIdx = pJoin->build->blkRowIdx; + continue; + } + + break; + } + + pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; + pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; + pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx; + + MJ_ERR_RET(mAsofJoinAddRowsToCache(pCtx, &pCtx->buildGrp, false)); + + return mAsofLowerDumpGrpCache(pCtx); +} + +int32_t mAsofLowerHandleGrpRemains(SMJoinWindowCtx* pCtx) { + return (pCtx->lastEqGrp) ? mAsofLowerDumpUpdateEqRows(pCtx, pCtx->pJoin, false) : mAsofLowerDumpGrpCache(pCtx); +} + +static bool mAsofLowerRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { + bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); + bool buildGot = false; + + do { + if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { + buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + } + + if (!probeGot) { + mJoinSetDone(pOperator); + return false; } break; } while (true); + pCtx->probeGrp.blk = pJoin->probe->blk; + pCtx->buildGrp.blk = pJoin->build->blk; + return true; } -SSDataBlock* mAsofJoinDo(struct SOperatorInfo* pOperator) { +SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; int32_t code = TSDB_CODE_SUCCESS; @@ -1756,7 +2072,7 @@ SSDataBlock* mAsofJoinDo(struct SOperatorInfo* pOperator) { blockDataCleanup(pCtx->finBlk); if (pCtx->grpRemains) { - MJ_ERR_JRET(mAsofJoinHandleGrpRemains(pCtx)); + MJ_ERR_JRET(mAsofLowerHandleGrpRemains(pCtx)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -1764,15 +2080,15 @@ SSDataBlock* mAsofJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mAsofJoinRetrieve(pOperator, pJoin, pCtx)) { + if (!mAsofLowerRetrieve(pOperator, pJoin, pCtx)) { break; } MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); - if (probeTs == pCtx->lastEqTs) { - MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); + if (probeTs == pCtx->lastTs) { + MJ_ERR_JRET(mAsofLowerProcessEqualGrp(pCtx, probeTs, true)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -1786,40 +2102,413 @@ SSDataBlock* mAsofJoinDo(struct SOperatorInfo* pOperator) { while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { if (probeTs == buildTs) { - pCtx->lastEqTs = probeTs; - MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); + pCtx->lastTs = probeTs; + MJ_ERR_JRET(mAsofLowerProcessEqualGrp(pCtx, probeTs, false)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); - } else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { - MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { - return pCtx->finBlk; - } + continue; + } + + if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { + MJ_ERR_JRET(mAsofLowerProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs)); } else { - while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { - MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); - if (PROBE_TS_OVER(pCtx->ascTs, probeTs, buildTs)) { - continue; - } - - break; - } + MJ_ERR_JRET(mAsofLowerProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs)); + } + + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; } } if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) { - pCtx->probeNEqGrp.blk = pJoin->probe->blk; - pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx; - pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; - pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1; + pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; + pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; + pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1; + + MJ_ERR_JRET(mAsofLowerDumpGrpCache(pCtx)); + + pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; + + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + } + } while (true); + +_return: + + if (code) { + pJoin->errCode = code; + return NULL; + } + + return pCtx->finBlk; +} + + +int32_t mAsofGreaterDumpGrpCache(SMJoinWindowCtx* pCtx) { + int64_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; + SMJoinWinCache* cache = &pCtx->cache; + int32_t buildGrpNum = taosArrayGetSize(cache->grps); + int64_t buildTotalRows = (cache->rowNum > pCtx->jLimit) ? pCtx->jLimit : cache->rowNum; + if (buildGrpNum <= 0 || buildTotalRows <= 0) { + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true); + } + + SMJoinGrpRows* probeGrp = &pCtx->probeGrp; + int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); + int32_t probeEndIdx = probeGrp->endIdx; + + if (0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) { + SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0); + if (pFirstBuild->readIdx == pFirstBuild->beginIdx) { + for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) { + SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx); + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + buildGrp->readIdx = buildGrp->beginIdx; + } + + pCtx->grpRemains = false; + return TSDB_CODE_SUCCESS; + } + } + + for (; !GRP_DONE(probeGrp); ) { + probeGrp->endIdx = probeGrp->readIdx; + for (; cache->grpIdx < buildGrpNum && rowsLeft > 0; ++cache->grpIdx) { + SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx); + + if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + rowsLeft -= GRP_REMAIN_ROWS(buildGrp); + buildGrp->readIdx = buildGrp->beginIdx; + continue; + } + + int32_t buildEndIdx = buildGrp->endIdx; + buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; + mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + buildGrp->readIdx += rowsLeft; + buildGrp->endIdx = buildEndIdx; + rowsLeft = 0; + break; + } + probeGrp->endIdx = probeEndIdx; + + if (cache->grpIdx >= buildGrpNum) { + cache->grpIdx = 0; + ++probeGrp->readIdx; + } + + if (rowsLeft <= 0) { + break; + } + } + + probeGrp->endIdx = probeEndIdx; + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; +} + + + +int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) { + if (pCtx->cache.rowNum >= pCtx->jLimit || pCtx->pJoin->build->dsFetchDone) { + return TSDB_CODE_SUCCESS; + } + + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinWinCache* pCache = &pCtx->cache; + int32_t grpNum = taosArrayGetSize(pCache->grps); + ASSERT(grpNum >= 1 && grpNum <= 2); + + SSDataBlock* pBlk = NULL; + SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1); + if (pGrp->blk != pCache->outBlk) { + pBlk = pGrp->blk; + taosArrayPop(pCache->grps); + } + + do { + if (NULL != pBlk) { + MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pBlk, build->blkRowIdx, pBlk->info.rows - build->blkRowIdx)); + } + + build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, build->downStreamIdx); + qDebug("%s merge join %s table got block to fill grp, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0); + + build->blkRowIdx = 0; + + if (NULL == build->blk) { + build->dsFetchDone = true; + break; + } + + MJOIN_PUSH_BLK_TO_CACHE(pCache, build->blk); + pBlk = build->blk; + } while (pCache->rowNum < pCtx->jLimit); + + MJOIN_RESTORE_TB_BLK(pCache, build); + + return TSDB_CODE_SUCCESS; +} + +int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) { + if (!lastBuildGrp) { + MJ_ERR_RET(mAsofGreaterChkFillGrpCache(pCtx)); + } + + return mAsofGreaterDumpGrpCache(pCtx); +} + +int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) { + SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); + + if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { + *wholeBlk = false; + return TSDB_CODE_SUCCESS; + } + + char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1); + if (timestamp != *(int64_t*)pEndVal) { + for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { + char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx); + if (timestamp == *(int64_t*)pNextVal) { + continue; + } + + return TSDB_CODE_SUCCESS + } + } + + *wholeBlk = true; + + return TSDB_CODE_SUCCESS; +} + +int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) { + SMJoinWinCache* cache = &pCtx->cache; + int32_t grpNum = taosArrayGetSize(cache->grps); + SMJoinTableCtx* pTable = pCtx->pJoin->build; + bool wholeBlk = false; + + do { + do { + MJ_ERR_RET(mAsofGreaterSkipEqRows(pTable, timestamp, &wholeBlk)); + if (!wholeBlk) { + return TSDB_CODE_SUCCESS; + } + + MJOIN_POP_TB_BLK(cache); + MJOIN_RESTORE_TB_BLK(cache, pTable); + } while (!MJOIN_BUILD_TB_ROWS_DONE(pTable)); + + pTable->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pTable->downStreamIdx); + qDebug("%s merge join %s table got block to skip eq ts, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); + + pTable->blkRowIdx = 0; + + if (NULL == pTable->blk) { + pTable->dsFetchDone = true; + return TSDB_CODE_SUCCESS; + } + + MJOIN_PUSH_BLK_TO_CACHE(cache, pTable->blk); + } while (true); + + return TSDB_CODE_SUCCESS; +} + + +int32_t mAsofGreaterUpdateDumpEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { + if (!pCtx->asofEqRow && !lastBuildGrp) { + MJ_ERR_RET(mAsofGreaterSkipAllEqRows(pCtx, timestamp)); + } + + return mAsofGreaterFillDumpGrpCache(pCtx, lastBuildGrp); +} + + +int32_t mAsofGreaterProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { + SMJoinOperatorInfo* pJoin = pCtx->pJoin; + + pCtx->lastEqGrp = true; + pCtx->cache.grpIdx = 0; + + MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &pCtx->probeGrp)); + + return mAsofGreaterUpdateDumpEqRows(pCtx, timestamp, lastBuildGrp); +} + +int32_t mAsofGreaterProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { + pCtx->lastEqGrp = false; + + pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; + pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; + pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx; + + while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pJoin->probe); + if (PROBE_TS_LOWER(pCtx->ascTs, *probeTs, *buildTs)) { + pCtx->probeGrp.endIdx = pJoin->probe->blkRowIdx; + continue; + } + + break; + } + + pCtx->cache.grpIdx = 0; + + return mAsofGreaterFillDumpGrpCache(pCtx, false); +} + +int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { + do { + MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build); + if (!PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { + break; + } + + pCtx->cache.rowNum--; + while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build); + if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { + pCtx->cache.rowNum--; + continue; + } + + return TSDB_CODE_SUCCESS; + } + + MJOIN_POP_TB_BLK(&pCtx->cache); + MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build); + } while (!MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)); + + return TSDB_CODE_SUCCESS; +} + +static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { + bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); + bool buildGot = false; + + do { + if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { + buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + } + + if (!probeGot) { + mJoinSetDone(pOperator); + return false; + } + + if (buildGot && pCtx->asofGreaterRow) { + SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); + SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); + if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { + pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; + continue; + } + } + + break; + } while (true); + + if (buildGot) { + MJOIN_PUSH_BLK_TO_CACHE(&pCtx->cache, pJoin->build->blk); + MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build); + } + + pCtx->probeGrp.blk = pJoin->probe->blk; + pCtx->buildGrp.blk = pJoin->build->blk; + + return true; +} + + +int32_t mAsofGreaterHandleGrpRemains(SMJoinWindowCtx* pCtx) { + return mAsofGreaterDumpGrpCache(pCtx); +} + + +SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { + SMJoinOperatorInfo* pJoin = pOperator->info; + SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; + int32_t code = TSDB_CODE_SUCCESS; + int64_t probeTs = 0; + int64_t buildTs = 0; + SColumnInfoData* pBuildCol = NULL; + SColumnInfoData* pProbeCol = NULL; + + blockDataCleanup(pCtx->finBlk); + + if (pCtx->grpRemains) { + MJ_ERR_JRET(mAsofGreaterHandleGrpRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + pCtx->grpRemains = false; + } + + do { + if (!mAsofGreaterRetrieve(pOperator, pJoin, pCtx)) { + break; + } + + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); + + if (probeTs == pCtx->lastTs) { + MJ_ERR_JRET(mAsofGreaterProcessEqualGrp(pCtx, probeTs, true)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + + if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { + continue; + } else { + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); + } + } + + while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { + if (probeTs == buildTs) { + pCtx->lastTs = probeTs; + MJ_ERR_JRET(mAsofGreaterProcessEqualGrp(pCtx, probeTs, false)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); + continue; + } + + if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { + MJ_ERR_JRET(mAsofGreaterProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs)); + } else { + MJ_ERR_JRET(mAsofGreaterProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs)); + } + + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + } + + if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) { + pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; + pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; + pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1; + + MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true)); pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; - MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -1841,15 +2530,15 @@ int32_t mJoinInitWinCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJ pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT; pCache->colNum = pJoin->build->finNum; - pCache->blk = createOneDataBlock(pCtx->finBlk, false); - if (NULL == pCache->blk) { + pCache->outBlk = createOneDataBlock(pCtx->finBlk, false); + if (NULL == pCache->outBlk) { return TSDB_CODE_OUT_OF_MEMORY; } - pCache->blk->info.capacity = pCtx->jLimit; + pCache->outBlk->info.capacity = pCtx->jLimit; SMJoinTableCtx* build = pJoin->build; for (int32_t i = 0; i < pCache->colNum; ++i) { - SColumnInfoData* pCol = taosArrayGet(pCache->blk->pDataBlock, build->finCols[i].dstSlot); + SColumnInfoData* pCol = taosArrayGet(pCache->outBlk->pDataBlock, build->finCols[i].dstSlot); doEnsureCapacity(pCol, NULL, pCtx->jLimit, false); } @@ -1886,6 +2575,12 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ pCtx->pJoin = pJoin; pCtx->lastEqTs = INT64_MIN; pCtx->hashCan = pJoin->probe->keyNum > 0; + if (JOIN_STYPE_ASOF == pJoin->subType) { + pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; + pJoin->subType = JOIN_STYPE_OUTER; + } else { + pCtx->jLimit = -1; + } if (pJoinNode->node.inputTsOrder != ORDER_DESC) { pCtx->ascTs = true; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 1a5fcf44ab..14b7c8fde1 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -419,7 +419,7 @@ int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app } -int32_t mJoinNonEqCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp) { +int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp) { pCtx->lastEqGrp = false; pCtx->lastProbeGrp = probeGrp; @@ -596,7 +596,7 @@ int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum while (++pTb->blkRowIdx < pTb->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pTb); - if (PROBE_TS_UNREACH(pCtx->ascTs, *probeTs, *buildTs)) { + if (PROBE_TS_LOWER(pCtx->ascTs, *probeTs, *buildTs)) { pCtx->probeNEqGrp.endIdx = pTb->blkRowIdx; continue; } @@ -607,7 +607,7 @@ int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true); } -int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { +int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { pCtx->buildNEqGrp.blk = pTb->blk; pCtx->buildNEqGrp.beginIdx = pTb->blkRowIdx; pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx; @@ -615,7 +615,7 @@ int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnIn while (++pTb->blkRowIdx < pTb->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pTb); - if (PROBE_TS_OVER(pCtx->ascTs, *probeTs, *buildTs)) { + if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { pCtx->buildNEqGrp.endIdx = pTb->blkRowIdx; continue; } @@ -765,6 +765,9 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi pTable->noKeepEqGrpRows = (JOIN_STYPE_ANTI == pJoin->subType && NULL == pJoin->pFPreFilter); pTable->multiEqGrpRows = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pFPreFilter); pTable->multiRowsGrp = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pPreFilter); + if (JOIN_STYPE_ASOF == pJoin->subType) { + pTable->eqRowLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; + } } else { pTable->multiEqGrpRows = true; } @@ -808,8 +811,9 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin } static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { - if (JOIN_STYPE_ASOF == pJoin->subType || JOIN_STYPE_WIN == pJoin->subType) { - //return mJoinInitWindowCtx(pJoin, pJoinNode); + if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType))) + || JOIN_STYPE_WIN == pJoin->subType) { + return mJoinInitWindowCtx(pJoin, pJoinNode); } return mJoinInitMergeCtx(pJoin, pJoinNode); @@ -880,6 +884,7 @@ int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t void mJoinResetForBuildTable(SMJoinTableCtx* pTable) { pTable->grpTotalRows = 0; pTable->grpIdx = 0; + pTable->eqRowNum = 0; mJoinDestroyCreatedBlks(pTable->createdBlks); taosArrayClear(pTable->eqGrps); if (pTable->rowBitmapSize > 0) { @@ -900,6 +905,7 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol mJoinResetForBuildTable(pTable); } + bool keepGrp = true; pGrp = taosArrayReserve(pTable->eqGrps, 1); pGrp->beginIdx = pTable->blkRowIdx++; @@ -927,6 +933,14 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol if (!pTable->multiEqGrpRows) { pGrp->endIdx = pGrp->beginIdx; + } else if (0 == pTable->eqRowLimit) { + // DO NOTHING + } else if (pTable->eqRowLimit == pTable->eqRowNum) { + keepGrp = false; + } else { + int64_t rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum); + pGrp->endIdx = pGrp->beginIdx + rowNum - 1; + pTable->eqRowNum += rowNum; } goto _return; @@ -936,28 +950,47 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol if (wholeBlk && (pTable->multiEqGrpRows || restart)) { *wholeBlk = true; - if (pTable->noKeepEqGrpRows) { + if (pTable->noKeepEqGrpRows || !keepGrp) { goto _return; } - if (0 == pGrp->beginIdx && pTable->multiEqGrpRows) { + if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) { pGrp->blk = createOneDataBlock(pTable->blk, true); + taosArrayPush(pTable->createdBlks, &pGrp->blk); } else { if (!pTable->multiEqGrpRows) { pGrp->endIdx = pGrp->beginIdx; } - pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, pGrp->endIdx - pGrp->beginIdx + 1); - pGrp->endIdx -= pGrp->beginIdx; - pGrp->beginIdx = 0; - pGrp->readIdx = 0; + int64_t rowNum = 0; + if (!pTable->multiEqGrpRows) { + rowNum = 1; + pGrp->endIdx = pGrp->beginIdx; + } else if (0 == pTable->eqRowLimit) { + rowNum = pGrp->endIdx - pGrp->beginIdx + 1; + } else if (pTable->eqRowLimit == pTable->eqRowNum) { + keepGrp = false; + } else { + rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum); + pGrp->endIdx = pGrp->beginIdx + rowNum - 1; + } + + if (keepGrp && rowNum > 0) { + pTable->eqRowNum += rowNum; + + pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum); + pGrp->endIdx -= pGrp->beginIdx; + pGrp->beginIdx = 0; + pGrp->readIdx = 0; + taosArrayPush(pTable->createdBlks, &pGrp->blk); + } } - taosArrayPush(pTable->createdBlks, &pGrp->blk); + } _return: - if (pTable->noKeepEqGrpRows || (!pTable->multiEqGrpRows && !restart)) { + if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) { taosArrayPop(pTable->eqGrps); } else { pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;