From 3f36f7ea520e560b550e1b8cc055e2df55203522 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 8 Mar 2024 16:23:41 +0800 Subject: [PATCH] enh: support desc join --- source/libs/executor/inc/mergejoin.h | 10 +- source/libs/executor/src/mergejoin.c | 631 +++++++++++++------ source/libs/executor/src/mergejoinoperator.c | 4 +- source/libs/executor/test/joinTests.cpp | 127 +++- tests/script/tsim/join/inner_join.sim | 35 + tests/script/tsim/join/left_anti_join.sim | 33 + tests/script/tsim/join/left_asof_join.sim | 182 ++++++ tests/script/tsim/join/left_join.sim | 46 ++ tests/script/tsim/join/left_semi_join.sim | 35 + tests/script/tsim/join/left_win_join.sim | 316 ++++++++++ 10 files changed, 1207 insertions(+), 212 deletions(-) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 7a72c064a6..b8d99047d7 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -230,6 +230,8 @@ typedef struct SMJoinWinCache { SSDataBlock* outBlk; } SMJoinWinCache; +typedef int32_t (*joinMoveWin)(void*); + typedef struct SMJoinWindowCtx { // KEEP IT FIRST struct SMJoinOperatorInfo* pJoin; @@ -250,9 +252,12 @@ typedef struct SMJoinWindowCtx { bool lowerRowsAcq; bool eqRowsAcq; bool greaterRowsAcq; + bool forwardRowsAcq; int64_t winBeginTs; int64_t winEndTs; + joinMoveWin moveWinBeginFp; + joinMoveWin moveWinEndFp; bool eqPostDone; int64_t lastTs; SMJoinGrpRows probeGrp; @@ -321,8 +326,9 @@ typedef struct SMJoinOperatorInfo { #define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair)) -#define PROBE_TS_LOWER(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts)) -#define PROBE_TS_GREATER(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts)) +#define PROBE_TS_NMATCH(_asc, _pts, _bts) (((_asc) && (_pts) < (_bts)) || (!(_asc) && (_pts) > (_bts))) +#define PROBE_TS_NREACH(_asc, _pts, _bts) (((_asc) && (_pts) > (_bts)) || (!(_asc) && (_pts) < (_bts))) +#define MJOIN_BUILD_BLK_OOR(_asc, _pts, _pidx, _bts, _bnum) (((_asc) && (*((int64_t*)(_pts) + (_pidx)) > *((int64_t*)(_bts) + (_bnum) - 1))) || ((!(_asc)) && (*((int64_t*)(_pts) + (_pidx)) < *((int64_t*)(_bts) + (_bnum) - 1)))) #define GRP_REMAIN_ROWS(_grp) ((_grp)->endIdx - (_grp)->readIdx + 1) #define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx) diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 4e12233e16..d35b194f7e 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -107,7 +107,6 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { return TSDB_CODE_SUCCESS; } - static int32_t mOuterJoinHashFullCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; @@ -482,8 +481,9 @@ static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi if (buildGot) { SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId); SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId); - if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { + if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; + buildGot = false; continue; } } @@ -568,7 +568,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); - } else if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { + } else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) { MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; @@ -576,7 +576,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { } else { while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); - if (PROBE_TS_GREATER(pCtx->ascTs, probeTs, buildTs)) { + if (PROBE_TS_NREACH(pCtx->ascTs, probeTs, buildTs)) { continue; } @@ -736,18 +736,32 @@ static FORCE_INLINE int32_t mInnerJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { } -static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { +static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; - if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { - buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); - } - - if (!probeGot) { - mJoinSetDone(pOperator); - return false; - } + do { + if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { + buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + } + + if (!probeGot) { + mJoinSetDone(pOperator); + return false; + } + + if (buildGot) { + SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId); + SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId); + if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) { + pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; + buildGot = false; + continue; + } + } + + break; + } while (true); return true; } @@ -773,7 +787,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mInnerJoinRetrieve(pOperator, pJoin)) { + if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) { break; } @@ -813,7 +827,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { continue; } - if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { + if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) { if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); continue; @@ -1176,7 +1190,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { continue; } - if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { + if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) { MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); } else { MJ_ERR_JRET(mJoinProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs)); @@ -1472,7 +1486,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mInnerJoinRetrieve(pOperator, pJoin)) { + if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) { break; } @@ -1512,7 +1526,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) { continue; } - if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { + if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) { if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); continue; @@ -1773,7 +1787,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); - } else if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { + } else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) { MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; @@ -1781,7 +1795,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { } else { while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); - if (PROBE_TS_GREATER(pCtx->ascTs, probeTs, buildTs)) { + if (PROBE_TS_NREACH(pCtx->ascTs, probeTs, buildTs)) { continue; } @@ -1816,7 +1830,7 @@ _return: } -int32_t mAsofLowerCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) { +int32_t mAsofBackwardCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) { if (pCache->outBlk->info.rows <= 0) { *evictRows = 0; return TMIN(jLimit, newRows); @@ -1836,10 +1850,10 @@ int32_t mAsofLowerCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t new return newRows; } -int32_t mAsofLowerAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool fromBegin) { +int32_t mAsofBackwardAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool fromBegin) { int32_t evictRows = 0; SMJoinWinCache* pCache = &pCtx->cache; - int32_t rows = mAsofLowerCalcRowNum(pCache, pCtx->jLimit, pGrp->endIdx - pGrp->beginIdx + 1, &evictRows); + int32_t rows = mAsofBackwardCalcRowNum(pCache, pCtx->jLimit, pGrp->endIdx - pGrp->beginIdx + 1, &evictRows); if (evictRows > 0) { MJ_ERR_RET(blockDataTrimFirstRows(pCache->outBlk, evictRows)); } @@ -1849,7 +1863,7 @@ int32_t mAsofLowerAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, boo } -int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) { +int32_t mAsofBackwardAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) { int64_t eqRowsNum = 0; SMJoinGrpRows grp; @@ -1883,7 +1897,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow if (eqRowsNum < pCtx->jLimit) { grp.endIdx = grp.beginIdx + TMIN(grp.endIdx - grp.beginIdx + 1, pCtx->jLimit - eqRowsNum) - 1; - MJ_ERR_RET(mAsofLowerAddRowsToCache(pCtx, &grp, true)); + MJ_ERR_RET(mAsofBackwardAddRowsToCache(pCtx, &grp, true)); } eqRowsNum += grp.endIdx - grp.beginIdx + 1; @@ -1906,7 +1920,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow return TSDB_CODE_SUCCESS; } -int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { +int32_t mAsofBackwardDumpGrpCache(SMJoinWindowCtx* pCtx) { if (NULL == pCtx->cache.outBlk || pCtx->cache.outBlk->info.rows <= 0) { return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, false); } @@ -1980,9 +1994,9 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { return TSDB_CODE_SUCCESS; } -int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, bool lastBuildGrp, bool skipEqPost) { +int32_t mAsofBackwardDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, bool lastBuildGrp, bool skipEqPost) { if (!pCtx->eqRowsAcq) { - MJ_ERR_RET(mAsofLowerDumpGrpCache(pCtx)); + MJ_ERR_RET(mAsofBackwardDumpGrpCache(pCtx)); pCtx->lastEqGrp = true; if (pCtx->grpRemains) { @@ -1992,21 +2006,21 @@ int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJ if (!pCtx->eqPostDone && !lastBuildGrp && (pCtx->eqRowsAcq || !skipEqPost)) { pCtx->eqPostDone = true; - MJ_ERR_RET(mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs)); + MJ_ERR_RET(mAsofBackwardAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs)); } if (!pCtx->eqRowsAcq) { return TSDB_CODE_SUCCESS; } - MJ_ERR_RET(mAsofLowerDumpGrpCache(pCtx)); + MJ_ERR_RET(mAsofBackwardDumpGrpCache(pCtx)); pCtx->lastEqGrp = true; return TSDB_CODE_SUCCESS; } -int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { +int32_t mAsofBackwardProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { SMJoinOperatorInfo* pJoin = pCtx->pJoin; if (!lastBuildGrp) { @@ -2016,13 +2030,13 @@ int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool bool wholeBlk = false; MJ_ERR_RET(mJoinBuildEqGrp(pJoin->probe, timestamp, &wholeBlk, &pCtx->probeGrp)); - MJ_ERR_RET(mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp, wholeBlk)); + MJ_ERR_RET(mAsofBackwardDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp, wholeBlk)); return TSDB_CODE_SUCCESS; } -int32_t mAsofLowerProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { +int32_t mAsofBackwardHandleClosedGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { pCtx->lastEqGrp = false; pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; @@ -2031,7 +2045,7 @@ int32_t mAsofLowerProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJo while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pJoin->probe); - if (PROBE_TS_LOWER(pCtx->ascTs, *probeTs, *buildTs)) { + if (PROBE_TS_NMATCH(pCtx->ascTs, *probeTs, *buildTs)) { pCtx->probeGrp.endIdx = pJoin->probe->blkRowIdx; continue; } @@ -2039,10 +2053,10 @@ int32_t mAsofLowerProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJo break; } - return mAsofLowerDumpGrpCache(pCtx); + return mAsofBackwardDumpGrpCache(pCtx); } -int32_t mAsofLowerProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { +int32_t mAsofBackwardHandleUnclosedGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { pCtx->lastEqGrp = false; pCtx->buildGrp.beginIdx = pJoin->build->blkRowIdx; @@ -2051,7 +2065,7 @@ int32_t mAsofLowerProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build); - if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { + if (PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) { pCtx->buildGrp.endIdx = pJoin->build->blkRowIdx; continue; } @@ -2063,14 +2077,14 @@ int32_t mAsofLowerProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx; - return mAsofLowerAddRowsToCache(pCtx, &pCtx->buildGrp, false); + return mAsofBackwardAddRowsToCache(pCtx, &pCtx->buildGrp, false); } -int32_t mAsofLowerHandleGrpRemains(SMJoinWindowCtx* pCtx) { - return (pCtx->lastEqGrp) ? mAsofLowerDumpUpdateEqRows(pCtx, pCtx->pJoin, false, true) : mAsofLowerDumpGrpCache(pCtx); +int32_t mAsofBackwardHandleGrpRemains(SMJoinWindowCtx* pCtx) { + return (pCtx->lastEqGrp) ? mAsofBackwardDumpUpdateEqRows(pCtx, pCtx->pJoin, false, true) : mAsofBackwardDumpGrpCache(pCtx); } -static bool mAsofLowerRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { +static bool mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; @@ -2102,7 +2116,7 @@ static bool mAsofLowerRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJo } -SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { +SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; int32_t code = TSDB_CODE_SUCCESS; @@ -2114,7 +2128,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { blockDataCleanup(pCtx->finBlk); if (pCtx->grpRemains) { - MJ_ERR_JRET(mAsofLowerHandleGrpRemains(pCtx)); + MJ_ERR_JRET(mAsofBackwardHandleGrpRemains(pCtx)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -2122,7 +2136,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mAsofLowerRetrieve(pOperator, pJoin, pCtx)) { + if (!mAsofBackwardRetrieve(pOperator, pJoin, pCtx)) { if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) { continue; } @@ -2134,7 +2148,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); if (probeTs == pCtx->lastTs) { - MJ_ERR_JRET(mAsofLowerProcessEqualGrp(pCtx, probeTs, true)); + MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, true)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -2148,14 +2162,14 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->lastEqGrp && !pCtx->eqPostDone) { pCtx->eqPostDone = true; - MJ_ERR_JRET(mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs)); + MJ_ERR_JRET(mAsofBackwardAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs)); MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); } while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { if (probeTs == buildTs) { pCtx->lastTs = probeTs; - MJ_ERR_JRET(mAsofLowerProcessEqualGrp(pCtx, probeTs, false)); + MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, false)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -2165,10 +2179,10 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { continue; } - if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { - MJ_ERR_JRET(mAsofLowerProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs)); + if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) { + MJ_ERR_JRET(mAsofBackwardHandleClosedGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs)); } else { - MJ_ERR_JRET(mAsofLowerProcessGreaterGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs)); + MJ_ERR_JRET(mAsofBackwardHandleUnclosedGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs)); } if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { @@ -2181,7 +2195,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1; - MJ_ERR_JRET(mAsofLowerDumpGrpCache(pCtx)); + MJ_ERR_JRET(mAsofBackwardDumpGrpCache(pCtx)); pCtx->lastEqGrp = false; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; @@ -2202,7 +2216,7 @@ _return: return pCtx->finBlk; } -int32_t mAsofGreaterTrimCacheBlk(SMJoinWindowCtx* pCtx) { +int32_t mAsofForwardTrimCacheBlk(SMJoinWindowCtx* pCtx) { if (taosArrayGetSize(pCtx->cache.grps) <= 0) { return TSDB_CODE_SUCCESS; } @@ -2218,12 +2232,12 @@ int32_t mAsofGreaterTrimCacheBlk(SMJoinWindowCtx* pCtx) { return TSDB_CODE_SUCCESS; } -int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) { +int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) { if (pCtx->cache.rowNum >= pCtx->jLimit || pCtx->pJoin->build->dsFetchDone) { return TSDB_CODE_SUCCESS; } - MJ_ERR_RET(mAsofGreaterTrimCacheBlk(pCtx)); + MJ_ERR_RET(mAsofForwardTrimCacheBlk(pCtx)); SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinWinCache* pCache = &pCtx->cache; @@ -2279,7 +2293,7 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) { return TSDB_CODE_SUCCESS; } -void mAsofGreaterUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) { +void mAsofForwardUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) { int32_t grpNum = taosArrayGetSize(pCtx->cache.grps); if (grpNum <= 0) { return; @@ -2300,18 +2314,18 @@ void mAsofGreaterUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) { pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows, remainRows) - 1; } -int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) { +int32_t mAsofForwardFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) { if (!lastBuildGrp) { MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build); - MJ_ERR_RET(mAsofGreaterChkFillGrpCache(pCtx)); + MJ_ERR_RET(mAsofForwardChkFillGrpCache(pCtx)); } - mAsofGreaterUpdateBuildGrpEndIdx(pCtx); + mAsofForwardUpdateBuildGrpEndIdx(pCtx); return mWinJoinDumpGrpCache(pCtx); } -int32_t mAsofGreaterSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) { +int32_t mAsofForwardSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) { SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { @@ -2343,14 +2357,14 @@ int32_t mAsofGreaterSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, in return TSDB_CODE_SUCCESS; } -int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) { +int32_t mAsofForwardSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) { SMJoinWinCache* cache = &pCtx->cache; SMJoinTableCtx* pTable = pCtx->pJoin->build; bool wholeBlk = false; do { do { - MJ_ERR_RET(mAsofGreaterSkipEqRows(pCtx, pTable, timestamp, &wholeBlk)); + MJ_ERR_RET(mAsofForwardSkipEqRows(pCtx, pTable, timestamp, &wholeBlk)); if (!wholeBlk) { return TSDB_CODE_SUCCESS; } @@ -2382,26 +2396,26 @@ int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) { } -int32_t mAsofGreaterUpdateDumpEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { +int32_t mAsofForwardUpdateDumpEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { if (!pCtx->eqRowsAcq && !lastBuildGrp) { - MJ_ERR_RET(mAsofGreaterSkipAllEqRows(pCtx, timestamp)); + MJ_ERR_RET(mAsofForwardSkipAllEqRows(pCtx, timestamp)); } - return mAsofGreaterFillDumpGrpCache(pCtx, lastBuildGrp); + return mAsofForwardFillDumpGrpCache(pCtx, lastBuildGrp); } -int32_t mAsofGreaterProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { +int32_t mAsofForwardProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { SMJoinOperatorInfo* pJoin = pCtx->pJoin; pCtx->lastEqGrp = true; MJ_ERR_RET(mJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp)); - return mAsofGreaterUpdateDumpEqRows(pCtx, timestamp, lastBuildGrp); + return mAsofForwardUpdateDumpEqRows(pCtx, timestamp, lastBuildGrp); } -int32_t mAsofGreaterProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { +int32_t mAsofForwardHandleProbeGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { pCtx->lastEqGrp = false; pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; @@ -2410,7 +2424,7 @@ int32_t mAsofGreaterProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pJoin->probe); - if (PROBE_TS_LOWER(pCtx->ascTs, *probeTs, *buildTs)) { + if (PROBE_TS_NMATCH(pCtx->ascTs, *probeTs, *buildTs)) { pCtx->probeGrp.endIdx = pJoin->probe->blkRowIdx; continue; } @@ -2418,20 +2432,20 @@ int32_t mAsofGreaterProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p break; } - return mAsofGreaterFillDumpGrpCache(pCtx, false); + return mAsofForwardFillDumpGrpCache(pCtx, false); } -int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData** pCol, int64_t* probeTs, int64_t* buildTs) { +int32_t mAsofForwardSkipBuildGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData** pCol, int64_t* probeTs, int64_t* buildTs) { do { MJOIN_GET_TB_CUR_TS(*pCol, *buildTs, pJoin->build); - if (!PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { + if (!PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) { break; } pCtx->cache.rowNum--; while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { MJOIN_GET_TB_CUR_TS(*pCol, *buildTs, pJoin->build); - if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { + if (PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) { pCtx->cache.rowNum--; continue; } @@ -2447,7 +2461,7 @@ int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* return TSDB_CODE_SUCCESS; } -static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { +static bool mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; @@ -2470,9 +2484,10 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p if (buildGot) { SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId); SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId); - if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { + if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; MJOIN_POP_TB_BLK(&pCtx->cache); + buildGot = false; continue; } } @@ -2496,7 +2511,7 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p } -SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { +SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; int32_t code = TSDB_CODE_SUCCESS; @@ -2516,7 +2531,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mAsofGreaterRetrieve(pOperator, pJoin, pCtx)) { + if (!mAsofForwardRetrieve(pOperator, pJoin, pCtx)) { if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) { continue; } @@ -2528,7 +2543,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); if (probeTs == pCtx->lastTs) { - MJ_ERR_JRET(mAsofGreaterProcessEqualGrp(pCtx, probeTs, true)); + MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, true)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -2543,7 +2558,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { if (probeTs == buildTs) { pCtx->lastTs = probeTs; - MJ_ERR_JRET(mAsofGreaterProcessEqualGrp(pCtx, probeTs, false)); + MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, false)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -2553,10 +2568,10 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { continue; } - if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { - MJ_ERR_JRET(mAsofGreaterProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs)); + if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) { + MJ_ERR_JRET(mAsofForwardHandleProbeGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs)); } else { - MJ_ERR_JRET(mAsofGreaterProcessGreaterGrp(pCtx, pJoin, &pBuildCol, &probeTs, &buildTs)); + MJ_ERR_JRET(mAsofForwardSkipBuildGrp(pCtx, pJoin, &pBuildCol, &probeTs, &buildTs)); } if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { @@ -2604,6 +2619,14 @@ void mAsofJoinGroupReset(SMJoinOperatorInfo* pJoin) { mJoinResetGroupTableCtx(pJoin->build); } +static FORCE_INLINE void mWinJoinPopFrontGroup(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp) { + pCtx->cache.rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx); + if (pGrp->blk == pCtx->cache.outBlk) { + blockDataCleanup(pGrp->blk); + } + + taosArrayPopFrontBatch(pCtx->cache.grps, 1); +} static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) { SMJoinWinCache* pCache = &pCtx->cache; @@ -2650,11 +2673,12 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin return false; } - if (buildGot && !pCtx->lowerRowsAcq) { + if (buildGot && pCtx->forwardRowsAcq) { SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId); SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId); - if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { + if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; + buildGot = false; continue; } } @@ -2667,106 +2691,137 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin return true; } -int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) { +int32_t mWinJoinTryAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) { SSDataBlock* pBlk = build->blk; SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId); - if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) >= pCtx->winBeginTs) { + if (pCtx->ascTs) { + if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) < pCtx->winBeginTs) { + *winEnd = false; + build->blk = NULL; + goto _return; + } + + if (*(int64_t*)pCol->pData > pCtx->winEndTs) { + *winEnd = true; + goto _return; + } + for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) { if (*((int64_t*)pCol->pData + build->blkRowIdx) < pCtx->winBeginTs) { continue; } - + if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) { SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx}; SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); - + pGrp->readIdx = pGrp->beginIdx; pGrp->endIdx = pGrp->beginIdx; - + build->blk = NULL; pCache->rowNum = 1; } else { pCache->rowNum = 0; } - + *winEnd = true; return TSDB_CODE_SUCCESS; } + + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } + + if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) > pCtx->winEndTs) { + *winEnd = false; + build->blk = NULL; + goto _return; + } + + if (*(int64_t*)pCol->pData < pCtx->winBeginTs) { + *winEnd = true; + goto _return; + } + + for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) { + if (*((int64_t*)pCol->pData + build->blkRowIdx) > pCtx->winEndTs) { + continue; + } + + if (*((int64_t*)pCol->pData + build->blkRowIdx) >= pCtx->winBeginTs) { + SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx}; + SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); + + pGrp->readIdx = pGrp->beginIdx; + pGrp->endIdx = pGrp->beginIdx; + + build->blk = NULL; + pCache->rowNum = 1; + } else { + pCache->rowNum = 0; + } + + *winEnd = true; + return TSDB_CODE_SUCCESS; + } + + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + +_return: pCache->rowNum = 0; - *winEnd = false; return TSDB_CODE_SUCCESS; } -int32_t mWinJoinAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) { - SSDataBlock* pBlk = build->blk; - SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId); - SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx}; - if (*((int64_t*)pCol->pData + build->blkRowIdx) > pCtx->winEndTs) { - *winEnd = true; - return TSDB_CODE_SUCCESS; - } - - if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) <= pCtx->winEndTs) { - SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); - - pGrp->readIdx = pGrp->beginIdx; - pGrp->endIdx = pBlk->info.rows - 1; - - pCache->rowNum += (pGrp->endIdx - pGrp->beginIdx + 1); - if (pCache->rowNum >= pCtx->jLimit) { - pGrp->endIdx = pBlk->info.rows - 1 + pCtx->jLimit - pCache->rowNum; - pCache->rowNum = pCtx->jLimit; - - build->blk = NULL; - *winEnd = true; +int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx) { + SMJoinWinCache* pCache = &pCtx->cache; + SMJoinTableCtx* build = pCtx->pJoin->build; + bool winEnd = false; + if (NULL != build->blk) { + MJ_ERR_RET(mWinJoinTryAddWinBeginBlk(pCtx, &pCtx->cache, build, &winEnd)); + if (winEnd || taosArrayGetSize(pCache->grps) > 0) { return TSDB_CODE_SUCCESS; } - - build->blk = NULL; - *winEnd = false; - return TSDB_CODE_SUCCESS; } - for (; build->blkRowIdx < pBlk->info.rows && pCache->rowNum < pCtx->jLimit; build->blkRowIdx++) { - if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) { - pCache->rowNum++; - continue; + if (build->dsFetchDone) { + goto _return; + } + + do { + build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pCtx->pJoin->build); + qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0); + + build->blkRowIdx = 0; + + if (NULL == build->blk) { + break; } - break; - } + MJ_ERR_RET(mWinJoinTryAddWinBeginBlk(pCtx, &pCtx->cache, build, &winEnd)); + if (winEnd || taosArrayGetSize(pCache->grps) > 0) { + return TSDB_CODE_SUCCESS; + } + } while (true); - SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); - - pGrp->readIdx = pGrp->beginIdx; - pGrp->endIdx = build->blkRowIdx - 1; - - build->blk = NULL; - *winEnd = true; +_return: return TSDB_CODE_SUCCESS; } -int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { +int32_t mWinJoinMoveAscWinBegin(SMJoinWindowCtx* pCtx) { SMJoinWinCache* pCache = &pCtx->cache; + do { int32_t grpNum = taosArrayGetSize(pCache->grps); for (int32_t i = 0; i < grpNum; ++i) { SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i); SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId); if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) < pCtx->winBeginTs) { - pCache->rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx); - if (pGrp->blk == pCache->outBlk) { - blockDataCleanup(pGrp->blk); - } - - taosArrayPopFrontBatch(pCache->grps, 1); + mWinJoinPopFrontGroup(pCtx, pGrp); grpNum--; i--; continue; @@ -2806,11 +2861,191 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { break; } while (true); + return mWinJoinAddWinBeginBlk(pCtx); +} + +int32_t mWinJoinMoveDescWinBegin(SMJoinWindowCtx* pCtx) { + SMJoinWinCache* pCache = &pCtx->cache; + + do { + int32_t grpNum = taosArrayGetSize(pCache->grps); + for (int32_t i = 0; i < grpNum; ++i) { + SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i); + SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId); + if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) > pCtx->winEndTs) { + mWinJoinPopFrontGroup(pCtx, pGrp); + + grpNum--; + i--; + continue; + } + + int32_t startIdx = pGrp->beginIdx; + for (; pGrp->beginIdx < pGrp->blk->info.rows; pGrp->beginIdx++) { + if (*((int64_t*)pCol->pData + pGrp->beginIdx) > pCtx->winEndTs) { + continue; + } + + if (*((int64_t*)pCol->pData + pGrp->beginIdx) >= pCtx->winBeginTs) { + pGrp->readIdx = pGrp->beginIdx; + if (pGrp->endIdx < pGrp->beginIdx) { + pGrp->endIdx = pGrp->beginIdx; + pCache->rowNum = 1; + } else { + pCache->rowNum -= (pGrp->beginIdx - startIdx); + } + return TSDB_CODE_SUCCESS; + } + + pGrp->endIdx = pGrp->beginIdx; + pCache->rowNum = 0; + TSWAP(pCache->grps, pCache->grpsQueue); + return TSDB_CODE_SUCCESS; + } + } + + if (NULL != pCache->grpsQueue) { + pCache->grps = pCache->grpsQueue; + pCache->rowNum = 1; + pCache->grpsQueue = NULL; + continue; + } + + break; + } while (true); + + return mWinJoinAddWinBeginBlk(pCtx); +} + +void mWinJoinRemoveOverflowGrp(SMJoinWindowCtx* pCtx) { + if (pCtx->cache.rowNum <= pCtx->jLimit) { + return; + } + + int32_t i = 0; + while (true) { + SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, i++); + if (NULL == pGrp) { + return; + } + + if ((pCtx->cache.rowNum - (pGrp->blk->info.rows - pGrp->beginIdx)) < pCtx->jLimit) { + return; + } + + mWinJoinPopFrontGroup(pCtx, pGrp); + i--; + } +} + +int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) { + SSDataBlock* pBlk = build->blk; + SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId); + SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx}; + + if (pCtx->ascTs) { + if (*((int64_t*)pCol->pData + build->blkRowIdx) > pCtx->winEndTs) { + *winEnd = true; + return TSDB_CODE_SUCCESS; + } + + if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) < pCtx->winBeginTs) { + *winEnd = false; + goto _return; + } + + if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) <= pCtx->winEndTs) { + SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); + + pGrp->readIdx = pGrp->beginIdx; + pGrp->endIdx = pBlk->info.rows - 1; + + pCache->rowNum += (pGrp->endIdx - pGrp->beginIdx + 1); + if (pCache->rowNum >= pCtx->jLimit) { + pGrp->endIdx = pBlk->info.rows - 1 + pCtx->jLimit - pCache->rowNum; + pCache->rowNum = pCtx->jLimit; + + *winEnd = true; + goto _return; + } + + *winEnd = false; + goto _return; + } + + for (; build->blkRowIdx < pBlk->info.rows && pCache->rowNum < pCtx->jLimit; build->blkRowIdx++) { + if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) { + pCache->rowNum++; + continue; + } + + break; + } + + SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); + + pGrp->readIdx = pGrp->beginIdx; + pGrp->endIdx = build->blkRowIdx - 1; + + *winEnd = true; + goto _return; + } + + if (*((int64_t*)pCol->pData + build->blkRowIdx) < pCtx->winBeginTs) { + *winEnd = true; + return TSDB_CODE_SUCCESS; + } + + if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) > pCtx->winEndTs) { + *winEnd = false; + goto _return; + } + + if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) >= pCtx->winBeginTs) { + SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); + + pGrp->readIdx = pGrp->beginIdx; + pGrp->endIdx = pBlk->info.rows - 1; + + pCache->rowNum += (pGrp->endIdx - pGrp->beginIdx + 1); + + mWinJoinRemoveOverflowGrp(pCtx); + + *winEnd = false; + goto _return; + } + + for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) { + if (*((int64_t*)pCol->pData + build->blkRowIdx) >= pCtx->winBeginTs) { + pCache->rowNum++; + continue; + } + + break; + } + + SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); + + pGrp->readIdx = pGrp->beginIdx; + pGrp->endIdx = build->blkRowIdx - 1; + + mWinJoinRemoveOverflowGrp(pCtx); + + *winEnd = true; + +_return: + + build->blk = NULL; + + return TSDB_CODE_SUCCESS; +} + +int32_t mWinJoinAddWinEndBlk(SMJoinWindowCtx* pCtx) { SMJoinTableCtx* build = pCtx->pJoin->build; bool winEnd = false; if (NULL != build->blk) { - MJ_ERR_RET(mWinJoinAddWinBeginBlk(pCtx, &pCtx->cache, build, &winEnd)); - if (winEnd || taosArrayGetSize(pCache->grps) > 0) { + MJ_ERR_RET(mWinJoinTryAddWinEndBlk(pCtx, &pCtx->cache, build, &winEnd)); + if (winEnd) { return TSDB_CODE_SUCCESS; } } @@ -2818,8 +3053,10 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { if (build->dsFetchDone) { goto _return; } - + do { + MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx)); + build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pCtx->pJoin->build); qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0); @@ -2829,8 +3066,8 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { break; } - MJ_ERR_RET(mWinJoinAddWinBeginBlk(pCtx, &pCtx->cache, build, &winEnd)); - if (winEnd || taosArrayGetSize(pCache->grps) > 0) { + MJ_ERR_RET(mWinJoinTryAddWinEndBlk(pCtx, &pCtx->cache, build, &winEnd)); + if (winEnd) { return TSDB_CODE_SUCCESS; } } while (true); @@ -2840,8 +3077,7 @@ _return: return TSDB_CODE_SUCCESS; } - -int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) { +int32_t mWinJoinMoveAscWinEnd(SMJoinWindowCtx* pCtx) { SMJoinWinCache* pCache = &pCtx->cache; int32_t grpNum = taosArrayGetSize(pCache->grps); if (grpNum <= 0 || pCache->rowNum >= pCtx->jLimit) { @@ -2881,50 +3117,81 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) { return TSDB_CODE_SUCCESS; } - SMJoinTableCtx* build = pCtx->pJoin->build; - bool winEnd = false; - if (NULL != build->blk) { - MJ_ERR_RET(mWinJoinAddWinEndBlk(pCtx, &pCtx->cache, build, &winEnd)); - if (winEnd) { - return TSDB_CODE_SUCCESS; - } - } + return mWinJoinAddWinEndBlk(pCtx); +} - if (build->dsFetchDone) { - goto _return; +int32_t mWinJoinMoveDescWinEnd(SMJoinWindowCtx* pCtx) { + SMJoinWinCache* pCache = &pCtx->cache; + int32_t grpNum = taosArrayGetSize(pCache->grps); + if (grpNum <= 0) { + return TSDB_CODE_SUCCESS; } + + SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps); + SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId); + if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) >= pCtx->winBeginTs) { + pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1; + pGrp->endIdx = pGrp->blk->info.rows - 1; + } else { + int32_t startIdx = pGrp->endIdx; + for (; ++pGrp->endIdx < pGrp->blk->info.rows; ) { + if (*((int64_t*)pCol->pData + pGrp->endIdx) >= pCtx->winBeginTs) { + pCache->rowNum++; + if ((pGrp->endIdx + 1) >= pGrp->blk->info.rows) { + break; + } + + continue; + } - do { - MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx)); - - build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pCtx->pJoin->build); - qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0); - - build->blkRowIdx = 0; - - if (NULL == build->blk) { + ASSERT(pGrp->endIdx > startIdx); + + pGrp->endIdx--; break; } - MJ_ERR_RET(mWinJoinAddWinEndBlk(pCtx, &pCtx->cache, build, &winEnd)); - if (winEnd) { - return TSDB_CODE_SUCCESS; - } - } while (true); + return TSDB_CODE_SUCCESS; + } -_return: - - return TSDB_CODE_SUCCESS; + return mWinJoinAddWinEndBlk(pCtx); } int32_t mWinJoinMoveFillWinCache(SMJoinWindowCtx* pCtx) { - MJ_ERR_RET(mWinJoinMoveWinBegin(pCtx)); - MJ_ERR_RET(mWinJoinMoveWinEnd(pCtx)); + MJ_ERR_RET((*pCtx->moveWinBeginFp)(pCtx)); + MJ_ERR_RET((*pCtx->moveWinEndFp)(pCtx)); return TSDB_CODE_SUCCESS; } +int32_t mWinJoinTrimDumpGrpCache(SMJoinWindowCtx* pCtx) { + if (!pCtx->ascTs) { + SMJoinWinCache* cache = &pCtx->cache; + if (cache->rowNum > pCtx->jLimit) { + int32_t skipRows = cache->rowNum - pCtx->jLimit; + int32_t buildGrpNum = taosArrayGetSize(cache->grps); + for (int32_t i = 0; i < buildGrpNum && skipRows > 0; ++i) { + SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, i); + if (skipRows >= GRP_REMAIN_ROWS(buildGrp)) { + skipRows -= GRP_REMAIN_ROWS(buildGrp); + mWinJoinPopFrontGroup(pCtx, buildGrp); + buildGrpNum--; + i--; + continue; + } else { + buildGrp->beginIdx += skipRows; + buildGrp->readIdx = buildGrp->beginIdx; + break; + } + } + + cache->rowNum = pCtx->jLimit; + } + } + + return mWinJoinDumpGrpCache(pCtx); +} + SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; @@ -2965,7 +3232,7 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { MJ_ERR_JRET(mWinJoinMoveFillWinCache(pCtx)); } - MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx)); + MJ_ERR_JRET(mWinJoinTrimDumpGrpCache(pCtx)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) { return pCtx->finBlk; @@ -3030,6 +3297,10 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p pJoin->outGrpId = 1; } + if (pJoinNode->node.inputTsOrder != ORDER_DESC) { + pCtx->ascTs = true; + } + switch (pJoinNode->subType) { case JOIN_STYPE_ASOF: pCtx->asofOpType = pJoinNode->asofOpType; @@ -3038,10 +3309,11 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p pCtx->lowerRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType); pCtx->greaterRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType); - if (pCtx->lowerRowsAcq) { - pJoin->joinFp = mAsofLowerJoinDo; - } else if (pCtx->greaterRowsAcq) { - pJoin->joinFp = mAsofGreaterJoinDo; + if ((pCtx->ascTs && pCtx->lowerRowsAcq) || (!pCtx->ascTs && pCtx->greaterRowsAcq) ) { + pJoin->joinFp = mAsofBackwardJoinDo; + } else { + pJoin->joinFp = mAsofForwardJoinDo; + pCtx->forwardRowsAcq = true; } pJoin->grpResetFp = mAsofJoinGroupReset; break; @@ -3055,6 +3327,11 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p pCtx->eqRowsAcq = (pCtx->winBeginOffset <= 0 && pCtx->winEndOffset >= 0); pCtx->lowerRowsAcq = pCtx->winBeginOffset < 0; pCtx->greaterRowsAcq = pCtx->winEndOffset > 0; + pCtx->moveWinBeginFp = (joinMoveWin)(pCtx->ascTs ? mWinJoinMoveAscWinBegin : mWinJoinMoveDescWinBegin); + pCtx->moveWinEndFp = (joinMoveWin)(pCtx->ascTs ? mWinJoinMoveAscWinEnd : mWinJoinMoveDescWinEnd); + if ((pCtx->ascTs && !pCtx->lowerRowsAcq) || (!pCtx->ascTs && !pCtx->greaterRowsAcq) ) { + pCtx->forwardRowsAcq = true; + } break; } default: diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index f23007ded2..acf0b6e622 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -633,7 +633,7 @@ int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnI while (++pTb->blkRowIdx < pTb->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pTb); - if (PROBE_TS_LOWER(pCtx->ascTs, *probeTs, *buildTs)) { + if (PROBE_TS_NMATCH(pCtx->ascTs, *probeTs, *buildTs)) { pCtx->probeNEqGrp.endIdx = pTb->blkRowIdx; continue; } @@ -652,7 +652,7 @@ int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum while (++pTb->blkRowIdx < pTb->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pTb); - if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { + if (PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) { pCtx->buildNEqGrp.endIdx = pTb->blkRowIdx; continue; } diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index bea0a54270..6186367e1b 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -66,7 +66,7 @@ enum { }; #define COL_DISPLAY_WIDTH 18 -#define JT_MAX_LOOP 20000 +#define JT_MAX_LOOP 5000 #define LEFT_BLK_ID 0 #define RIGHT_BLK_ID 1 @@ -957,7 +957,7 @@ void appendAllAsofResRows() { taosArrayClear(jtCtx.leftRowsList); } -void chkAppendAsofGreaterResRows(bool forceOut) { +void chkAppendAsofForwardGrpResRows(bool forceOut) { int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList); if (rightRows < jtCtx.jLimit && !forceOut) { return; @@ -974,7 +974,7 @@ void chkAppendAsofGreaterResRows(bool forceOut) { for (int32_t r = rightOffset; r < rightRows; ++r) { char* rightRow = (char*)taosArrayGet(jtCtx.rightRowsList, r); int64_t* rightTs = (int64_t*)(rightRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]); - if ((*leftTs > *rightTs) || (*leftTs == *rightTs && OP_TYPE_LOWER_THAN == jtCtx.asofOpType)) { + if (((jtCtx.asc && *leftTs > *rightTs) || (!jtCtx.asc && *leftTs < *rightTs)) || (*leftTs == *rightTs && (OP_TYPE_LOWER_THAN == jtCtx.asofOpType || OP_TYPE_GREATER_THAN == jtCtx.asofOpType))) { rightOffset++; rightRemains--; if (rightRemains < jtCtx.jLimit && !forceOut) { @@ -1031,7 +1031,8 @@ void appendWinEachResGrps(char* leftInRow, int32_t rightOffset, int32_t rightRow } int32_t endIdx = rightRows + rightOffset; - for (int32_t r = rightOffset; r < endIdx; ++r) { + int32_t beginIdx = (!jtCtx.asc && rightRows > jtCtx.jLimit) ? (endIdx - jtCtx.jLimit) : rightOffset; + for (int32_t r = beginIdx; r < endIdx; ++r) { bool* rightFilterOut = (bool*)taosArrayGet(jtCtx.rightFilterOut, r); if (*rightFilterOut) { continue; @@ -1075,7 +1076,7 @@ void chkAppendWinResRows(bool forceOut) { for (int32_t r = rightOffset; r < rightRows; ++r) { char* rightRow = (char*)taosArrayGet(jtCtx.rightRowsList, r); int64_t* rightTs = (int64_t*)(rightRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]); - if (*rightTs < winStart) { + if ((jtCtx.asc && *rightTs < winStart) || (!jtCtx.asc && *rightTs > winEnd)) { rightOffset++; rightRemains--; if (rightRemains < jtCtx.jLimit && !forceOut) { @@ -1086,7 +1087,7 @@ void chkAppendWinResRows(bool forceOut) { } continue; - } else if (*rightTs > winEnd) { + } else if ((jtCtx.asc && *rightTs > winEnd) || (!jtCtx.asc && *rightTs < winStart)) { winClosed = true; appendWinEachResGrps(leftRow, winBeginIdx, r - winBeginIdx); append = true; @@ -1097,7 +1098,7 @@ void chkAppendWinResRows(bool forceOut) { winBeginIdx = r; } - if ((r - winBeginIdx + 1) >= jtCtx.jLimit) { + if (jtCtx.asc && (r - winBeginIdx + 1) >= jtCtx.jLimit) { appendWinEachResGrps(leftRow, winBeginIdx, jtCtx.jLimit); append = true; break; @@ -1213,7 +1214,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { switch (jtInputColType[c]) { case TSDB_DATA_TYPE_TIMESTAMP: - ++jtCtx.curTs; + jtCtx.asc ? ++jtCtx.curTs : --jtCtx.curTs; pData = (char*)&jtCtx.curTs; isNull = false; if (!filterOut && filterNum && filterCol[c] && jtCtx.curTs <= TIMESTAMP_FILTER_VALUE) { @@ -1292,14 +1293,14 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { if (keepInput) { if (JOIN_STYPE_ASOF == jtCtx.subType) { - if (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN || jtCtx.asofOpType == OP_TYPE_EQUAL) { + if (((jtCtx.asc && (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN)) || (!jtCtx.asc && (jtCtx.asofOpType == OP_TYPE_LOWER_EQUAL || jtCtx.asofOpType == OP_TYPE_LOWER_THAN)) ) || jtCtx.asofOpType == OP_TYPE_EQUAL) { if (blkId == LEFT_BLK_ID) { appendAllAsofResRows(); } else { trimForAsofJlimit(); } } else { - chkAppendAsofGreaterResRows(false); + chkAppendAsofForwardGrpResRows(false); } } else { chkAppendWinResRows(false); @@ -1965,7 +1966,7 @@ void addAsofEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) { } } - if (!leftTable && (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN || jtCtx.asofOpType == OP_TYPE_EQUAL)) { + if (!leftTable && ((jtCtx.asc && (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN)) || (!jtCtx.asc && (jtCtx.asofOpType == OP_TYPE_LOWER_EQUAL || jtCtx.asofOpType == OP_TYPE_LOWER_THAN))) || jtCtx.asofOpType == OP_TYPE_EQUAL) { trimForAsofJlimit(); } } @@ -1973,22 +1974,58 @@ void addAsofEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) { void asofJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows; + if (jtCtx.asc) { + switch (jtCtx.asofOpType) { + case OP_TYPE_GREATER_THAN: + addAsofEqInRows(leftGrpRows, 0, true); + appendAllAsofResRows(); + addAsofEqInRows(rightGrpRows, rightTbOffset, false); + break; + case OP_TYPE_GREATER_EQUAL: + addAsofEqInRows(leftGrpRows, 0, true); + addAsofEqInRows(rightGrpRows, rightTbOffset, false); + appendAllAsofResRows(); + break; + case OP_TYPE_LOWER_THAN: + case OP_TYPE_LOWER_EQUAL: + addAsofEqInRows(leftGrpRows, 0, true); + addAsofEqInRows(rightGrpRows, rightTbOffset, false); + chkAppendAsofForwardGrpResRows(false); + break; + case OP_TYPE_EQUAL: + taosArrayClear(jtCtx.leftRowsList); + taosArrayClear(jtCtx.rightRowsList); + taosArrayClear(jtCtx.rightFilterOut); + addAsofEqInRows(leftGrpRows, 0, true); + addAsofEqInRows(rightGrpRows, rightTbOffset, false); + chkAppendAsofForwardGrpResRows(true); + taosArrayClear(jtCtx.leftRowsList); + taosArrayClear(jtCtx.rightRowsList); + taosArrayClear(jtCtx.rightFilterOut); + break; + default: + return; + } + + return; + } + switch (jtCtx.asofOpType) { - case OP_TYPE_GREATER_THAN: - addAsofEqInRows(leftGrpRows, 0, true); - appendAllAsofResRows(); - addAsofEqInRows(rightGrpRows, rightTbOffset, false); - break; - case OP_TYPE_GREATER_EQUAL: - addAsofEqInRows(leftGrpRows, 0, true); - addAsofEqInRows(rightGrpRows, rightTbOffset, false); - appendAllAsofResRows(); - break; case OP_TYPE_LOWER_THAN: + addAsofEqInRows(leftGrpRows, 0, true); + appendAllAsofResRows(); + addAsofEqInRows(rightGrpRows, rightTbOffset, false); + break; case OP_TYPE_LOWER_EQUAL: addAsofEqInRows(leftGrpRows, 0, true); addAsofEqInRows(rightGrpRows, rightTbOffset, false); - chkAppendAsofGreaterResRows(false); + appendAllAsofResRows(); + break; + case OP_TYPE_GREATER_THAN: + case OP_TYPE_GREATER_EQUAL: + addAsofEqInRows(leftGrpRows, 0, true); + addAsofEqInRows(rightGrpRows, rightTbOffset, false); + chkAppendAsofForwardGrpResRows(false); break; case OP_TYPE_EQUAL: taosArrayClear(jtCtx.leftRowsList); @@ -1996,7 +2033,7 @@ void asofJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { taosArrayClear(jtCtx.rightFilterOut); addAsofEqInRows(leftGrpRows, 0, true); addAsofEqInRows(rightGrpRows, rightTbOffset, false); - chkAppendAsofGreaterResRows(true); + chkAppendAsofForwardGrpResRows(true); taosArrayClear(jtCtx.leftRowsList); taosArrayClear(jtCtx.rightRowsList); taosArrayClear(jtCtx.rightFilterOut); @@ -2227,7 +2264,7 @@ void createTsEqGrpRows(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t left jtCtx.inputStat |= (1 << 2); } - ++jtCtx.curTs; + jtCtx.asc ? ++jtCtx.curTs : --jtCtx.curTs; if (NULL == *ppLeft && leftGrpRows > 0) { *ppLeft = createDummyBlock(LEFT_BLK_ID); @@ -2257,8 +2294,9 @@ void createTsEqGrpRows(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t left void forceFlushResRows() { if (JOIN_STYPE_ASOF == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) { - ASSERT(OP_TYPE_LOWER_EQUAL == jtCtx.asofOpType || OP_TYPE_LOWER_THAN == jtCtx.asofOpType); - chkAppendAsofGreaterResRows(true); + ASSERT((jtCtx.asc && (OP_TYPE_LOWER_EQUAL == jtCtx.asofOpType || OP_TYPE_LOWER_THAN == jtCtx.asofOpType)) + || (!jtCtx.asc && (OP_TYPE_GREATER_EQUAL == jtCtx.asofOpType || OP_TYPE_GREATER_THAN == jtCtx.asofOpType))); + chkAppendAsofForwardGrpResRows(true); } else if (JOIN_STYPE_WIN == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) { chkAppendWinResRows(true); } @@ -2277,7 +2315,8 @@ void createBothBlkRowsData(void) { jtCtx.rightTotalRows = taosRand() % jtCtx.rightMaxRows; int32_t minTotalRows = TMIN(jtCtx.leftTotalRows, jtCtx.rightTotalRows); - jtCtx.curTs = TIMESTAMP_FILTER_VALUE - minTotalRows / 5; + int32_t maxTotalRows = TMAX(jtCtx.leftTotalRows, jtCtx.rightTotalRows); + jtCtx.curTs = jtCtx.asc ? (TIMESTAMP_FILTER_VALUE - minTotalRows / 5) : (TIMESTAMP_FILTER_VALUE + 4 * maxTotalRows / 5); int32_t leftTotalRows = 0, rightTotalRows = 0; int32_t leftGrpRows = 0, rightGrpRows = 0; @@ -2463,8 +2502,8 @@ void printInputData() { printInputRowData(pBlk, &leftRowIdx); break; } - - printf("\t--------------------------blk end------------------------------- "); + + printf("\t%*s-------------------------blk end-------------------------------", jtCtx.grpJoin ? 6 : 0, " "); jtCtx.leftBlkReadIdx++; leftRowIdx = 0; break; @@ -2481,7 +2520,7 @@ void printInputData() { break; } - printf("\t%*s--------------------------blk end----------------------------\t", jtCtx.grpJoin ? 5 : 0, " "); + printf("\t%*s--------------------------blk end----------------------------\t", jtCtx.grpJoin ? 6 : 0, " "); jtCtx.rightBlkReadIdx++; rightRowIdx = 0; break; @@ -2801,7 +2840,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { bool contLoop = true; SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param); - createDummyBlkList(12, 3, 12, 3, 3); + createDummyBlkList(100, 10, 100, 10, 10); while (contLoop) { rerunBlockedHere(); @@ -2848,6 +2887,7 @@ TEST(innerJoin, noCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -2874,6 +2914,7 @@ TEST(innerJoin, eqCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -2900,6 +2941,7 @@ TEST(innerJoin, onCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -2926,6 +2968,7 @@ TEST(innerJoin, fullCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -2954,6 +2997,7 @@ TEST(leftOuterJoin, noCondTest) { param.asc = true; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.grpJoin = taosRand() % 2 ? true : false; param.filter = false; runSingleTest(caseName, ¶m); @@ -2982,6 +3026,7 @@ TEST(leftOuterJoin, eqCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3008,6 +3053,7 @@ TEST(leftOuterJoin, onCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3034,6 +3080,7 @@ TEST(leftOuterJoin, fullCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3062,6 +3109,7 @@ TEST(fullOuterJoin, noCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3088,6 +3136,7 @@ TEST(fullOuterJoin, eqCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3115,6 +3164,7 @@ TEST(fullOuterJoin, onCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3141,6 +3191,7 @@ TEST(fullOuterJoin, fullCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3169,6 +3220,7 @@ TEST(leftSemiJoin, noCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3195,6 +3247,7 @@ TEST(leftSemiJoin, eqCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3222,6 +3275,7 @@ TEST(leftSemiJoin, onCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3248,6 +3302,7 @@ TEST(leftSemiJoin, fullCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3276,6 +3331,7 @@ TEST(leftAntiJoin, noCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3302,6 +3358,7 @@ TEST(leftAntiJoin, eqCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3329,6 +3386,7 @@ TEST(leftAntiJoin, onCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3355,6 +3413,7 @@ TEST(leftAntiJoin, fullCondTest) { param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.filter = false; runSingleTest(caseName, ¶m); @@ -3383,6 +3442,7 @@ TEST(leftAsofJoin, noCondGreaterThanTest) { param.asc = true; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; param.grpJoin = taosRand() % 2 ? true : false; @@ -3413,6 +3473,7 @@ TEST(leftAsofJoin, noCondGreaterEqTest) { param.asc = true; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; param.grpJoin = taosRand() % 2 ? true : false; @@ -3443,6 +3504,7 @@ TEST(leftAsofJoin, noCondEqTest) { param.asc = true; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; param.grpJoin = taosRand() % 2 ? true : false; @@ -3473,6 +3535,7 @@ TEST(leftAsofJoin, noCondLowerThanTest) { param.asc = true; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; param.grpJoin = taosRand() % 2 ? true : false; @@ -3504,6 +3567,7 @@ TEST(leftAsofJoin, noCondLowerEqTest) { param.asc = true; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; param.grpJoin = taosRand() % 2 ? true : false; @@ -3537,6 +3601,7 @@ TEST(leftWinJoin, noCondProjectionTest) { param.asc = true; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.asc = !param.asc; param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; param.grpJoin = taosRand() % 2 ? true : false; diff --git a/tests/script/tsim/join/inner_join.sim b/tests/script/tsim/join/inner_join.sim index 8130ff63ac..189580a80a 100644 --- a/tests/script/tsim/join/inner_join.sim +++ b/tests/script/tsim/join/inner_join.sim @@ -134,4 +134,39 @@ if $data13 != 3 then return -1 endi +sql select a.ts, b.ts from sta a join sta b on a.ts=b.ts order by a.ts desc; +if $rows != 12 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a join sta b on a.ts=b.ts order by b.ts desc; +if $rows != 12 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi + + diff --git a/tests/script/tsim/join/left_anti_join.sim b/tests/script/tsim/join/left_anti_join.sim index d8de1bd8b3..a04dcba9f9 100644 --- a/tests/script/tsim/join/left_anti_join.sim +++ b/tests/script/tsim/join/left_anti_join.sim @@ -86,3 +86,36 @@ if $data11 != NULL then return -1 endi +sql select a.ts, b.ts from tba1 a left anti join tba2 b on a.ts = b.ts order by a.ts desc; +if $rows != 2 then + return -1 +endi +if $data00 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != NULL then + return -1 +endi + +sql select a.ts, b.ts from sta a left anti join sta b on a.ts = b.ts and b.ts < '2023-11-17 16:29:03.000' order by a.ts desc; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != NULL then + return -1 +endi diff --git a/tests/script/tsim/join/left_asof_join.sim b/tests/script/tsim/join/left_asof_join.sim index 354c86f86b..d9471c0468 100644 --- a/tests/script/tsim/join/left_asof_join.sim +++ b/tests/script/tsim/join/left_asof_join.sim @@ -977,6 +977,188 @@ if $rows != 10 then return -1 endi +sql select a.ts, b.ts from sta a left asof join sta b order by a.ts desc; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b jlimit 2 order by a.ts desc; +if $rows != 16 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:03.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts > b.ts order by a.ts desc; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:03.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts > b.ts jlimit 2 order by a.ts desc; +if $rows != 14 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:03.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts order by a.ts desc; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts jlimit 2 order by a.ts desc; +if $rows != 15 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts < b.ts order by a.ts desc; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts < b.ts jlimit 2 order by a.ts desc; +if $rows != 14 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts = b.ts order by a.ts desc; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts = b.ts jlimit 2 order by a.ts desc; +if $rows != 12 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi + sql_error select a.ts, b.ts from sta a asof join sta b on a.ts = b.ts; sql_error select a.ts, b.ts from sta a full asof join sta b on a.ts = b.ts; sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts != b.ts; diff --git a/tests/script/tsim/join/left_join.sim b/tests/script/tsim/join/left_join.sim index d88c49f8c8..13e2774163 100644 --- a/tests/script/tsim/join/left_join.sim +++ b/tests/script/tsim/join/left_join.sim @@ -158,3 +158,49 @@ if $data31 != NULL then return -1 endi +sql select a.ts, b.ts from tba2 a left join tba1 b on a.ts = b.ts order by a.ts desc; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data21 != NULL then + return -1 +endi +if $data30 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:00.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left join sta b on a.ts=b.ts order by a.ts desc; +if $rows != 12 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi + diff --git a/tests/script/tsim/join/left_semi_join.sim b/tests/script/tsim/join/left_semi_join.sim index e7fa4da0f3..27872ce0a3 100644 --- a/tests/script/tsim/join/left_semi_join.sim +++ b/tests/script/tsim/join/left_semi_join.sim @@ -74,6 +74,41 @@ if $data11 != 4 then return -1 endi +sql select a.ts, b.ts from tba1 a left semi join tba2 b on a.ts = b.ts order by a.ts desc; +if $rows != 2 then + return -1 +endi +if $data00 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:00.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left semi join sta b on a.ts = b.ts order by a.ts desc; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi + + sql_error select a.ts, b.ts from sta a left semi join sta b jlimit 3 where a.ts > b.ts; sql_error select a.ts, b.ts from sta a left semi join sta b where a.ts > b.ts; sql_error select a.ts, b.ts from sta a left semi join sta b on a.ts > 1 where a.ts = b.ts; diff --git a/tests/script/tsim/join/left_win_join.sim b/tests/script/tsim/join/left_win_join.sim index 504b9d15a0..972c9e954c 100644 --- a/tests/script/tsim/join/left_win_join.sim +++ b/tests/script/tsim/join/left_win_join.sim @@ -624,6 +624,322 @@ if $data31 != 1 then return -1 endi +sql select a.ts, b.ts from sta a left window join sta b window_offset(-2s, -1s) order by a.ts desc; +if $rows != 17 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +sql select a.ts, b.ts from sta a left window join sta b window_offset(-2s, -1s) jlimit 1 order by a.ts desc; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:01.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left window join sta b window_offset(-2s, -1s) jlimit 1; +if $rows != 8 then + return -1 +endi +if $data50 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data61 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data70 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data71 != @23-11-17 16:29:03.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left window join sta b window_offset(0s, 0s) order by a.ts desc; +if $rows != 12 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) order by a.ts desc; +if $rows != 28 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left window join sta b window_offset(1s, 2s) order by a.ts desc; +if $rows != 16 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:05.000@ then + return -1 +endi +sql select a.ts, b.ts from sta a left window join sta b window_offset(1s, 2s) jlimit 1 order by a.ts desc; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:04.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left window join sta b on a.t1=b.t1 window_offset(-2s, -1s) order by a.ts desc, b.ts; +if $rows != 9 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +sql select a.ts, b.ts from sta a left window join sta b on a.t1=b.t1 window_offset(0s, 0s) order by a.ts desc; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +sql select a.ts, b.ts from sta a left window join sta b on a.t1=b.t1 window_offset(-1s, 1s) order by a.ts desc, b.ts desc; +if $rows != 14 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:04.000@ then + return -1 +endi +sql select a.ts, b.ts from sta a left window join sta b on a.t1=b.t1 window_offset(1s, 2s) order by a.ts desc, b.ts desc; +if $rows != 9 then + return -1 +endi +if $data00 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data11 != NULL then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:04.000@ then + return -1 +endi +sql select count(*) from sta a left window join sta b on a.t1=b.t1 window_offset(1s, 2s) order by a.ts desc; +if $rows != 8 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data10 != 1 then + return -1 +endi +if $data20 != 1 then + return -1 +endi +if $data30 != 1 then + return -1 +endi +if $data40 != 2 then + return -1 +endi +if $data50 != 1 then + return -1 +endi +if $data60 != 1 then + return -1 +endi +if $data70 != 1 then + return -1 +endi +sql select count(b.ts) from sta a left window join sta b on a.t1=b.t1 window_offset(1s, 2s) order by a.ts desc; +if $rows != 8 then + return -1 +endi +if $data00 != 0 then + return -1 +endi +if $data10 != 0 then + return -1 +endi +if $data20 != 1 then + return -1 +endi +if $data30 != 1 then + return -1 +endi +if $data40 != 2 then + return -1 +endi +if $data50 != 1 then + return -1 +endi +if $data60 != 1 then + return -1 +endi +if $data70 != 1 then + return -1 +endi + sql_error select a.col1, count(*) from sta a left window join sta b window_offset(-1s, 1s); sql_error select b.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s); sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(b.ts > 0);