From 8a2b4e2d86f7f5e23584f62d28f125e111575195 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 27 Feb 2024 18:18:25 +0800 Subject: [PATCH] enh: support group join --- include/libs/nodes/plannodes.h | 1 + source/libs/executor/inc/mergejoin.h | 37 +- source/libs/executor/src/mergejoin.c | 216 +++++++--- source/libs/executor/src/mergejoinoperator.c | 126 +++++- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/planner/src/planOptimizer.c | 45 +- source/libs/planner/src/planSpliter.c | 2 +- tests/script/tsim/join/left_asof_join.sim | 429 +++++++++++++++++++ tests/script/tsim/join/left_win_join.sim | 189 ++++++++ 9 files changed, 946 insertions(+), 100 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index faf196f5f3..ec5571f11c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -140,6 +140,7 @@ typedef struct SJoinLogicNode { SNode* pFullOnCond; // except prim eq cond SNodeList* pLeftEqNodes; SNodeList* pRightEqNodes; + bool allEqTags; bool isSingleTableJoin; bool hasSubQuery; bool isLowLevelJoin; diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 65ea21ccad..e1f82ddeca 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -33,14 +33,12 @@ extern "C" { struct SMJoinOperatorInfo; -typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*); -typedef int32_t (*joinCartFp)(void*); - typedef enum EJoinTableType { E_JOIN_TB_BUILD = 1, E_JOIN_TB_PROBE } EJoinTableType; + #define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE") #define IS_FULL_OUTER_JOIN(_jtype, _stype) ((_jtype) == JOIN_TYPE_FULL && (_stype) == JOIN_STYPE_OUTER) @@ -171,6 +169,7 @@ typedef struct SMJoinGrpRows { bool lastEqGrp; \ bool lastProbeGrp; \ bool seqWinGrp; \ + bool groupJoin; \ int32_t blkThreshold; \ int64_t jLimit @@ -178,6 +177,8 @@ typedef struct SMJoinCommonCtx { MJOIN_COMMON_CTX; } SMJoinCommonCtx; +typedef int32_t (*joinCartFp)(void*); + typedef struct SMJoinMergeCtx { // KEEP IT FIRST struct SMJoinOperatorInfo* pJoin; @@ -187,12 +188,12 @@ typedef struct SMJoinMergeCtx { bool lastEqGrp; bool lastProbeGrp; bool seqWinGrp; + bool groupJoin; int32_t blkThreshold; int64_t jLimit; // KEEP IT FIRST bool hashCan; - bool keepOrder; bool midRemains; bool nmatchRemains; SSDataBlock* midBlk; @@ -230,6 +231,7 @@ typedef struct SMJoinWindowCtx { bool lastEqGrp; bool lastProbeGrp; bool seqWinGrp; + bool groupJoin; int32_t blkThreshold; int64_t jLimit; // KEEP IT FIRST @@ -240,14 +242,11 @@ typedef struct SMJoinWindowCtx { bool lowerRowsAcq; bool eqRowsAcq; bool greaterRowsAcq; - bool groupJoin; - int64_t seqGrpId; int64_t winBeginTs; int64_t winEndTs; bool eqPostDone; int64_t lastTs; - bool rowRemains; SMJoinGrpRows probeGrp; SMJoinGrpRows buildGrp; SMJoinWinCache cache; @@ -279,26 +278,28 @@ typedef struct SMJoinExecInfo { int64_t expectRows; } SMJoinExecInfo; -typedef struct SMJoinRetrieveCtx { - bool grpRetrieve; - uint64_t lastGid[2]; - SSDataBlock* remainBlk[2]; -} SMJoinRetrieveCtx; + +typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*); +typedef SSDataBlock* (*joinRetrieveFp)(struct SMJoinOperatorInfo*, SMJoinTableCtx*); +typedef void (*joinResetFp)(struct SMJoinOperatorInfo*); + typedef struct SMJoinOperatorInfo { SOperatorInfo* pOperator; int32_t joinType; int32_t subType; int32_t inputTsOrder; - int32_t errCode; + int32_t errCode; + int64_t outGrpId; SMJoinTableCtx tbs[2]; SMJoinTableCtx* build; SMJoinTableCtx* probe; - SMJoinRetrieveCtx retrieveCtx; SFilterInfo* pFPreFilter; SFilterInfo* pPreFilter; SFilterInfo* pFinFilter; joinImplFp joinFp; + joinRetrieveFp retrieveFp; + joinResetFp grpResetFp; SMJoinCtx ctx; SMJoinExecInfo execInfo; } SMJoinOperatorInfo; @@ -420,11 +421,15 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator); -bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb); +void mJoinResetGroupTableCtx(SMJoinTableCtx* pCtx); +void mJoinResetTableCtx(SMJoinTableCtx* pCtx); +void mWinJoinGroupReset(SMJoinOperatorInfo* pJoin); +bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb); void mJoinSetDone(SOperatorInfo* pOperator); +bool mJoinIsDone(SOperatorInfo* pOperator); bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen); int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart); -int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp); +int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable, int64_t timestamp); int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable); int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable); int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable); diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 338da457ad..a3f00ed25c 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -35,12 +35,12 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { int32_t buildGrpNum = taosArrayGetSize(cache->grps); int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit); - pCtx->finBlk->info.id.groupId = (pCtx->seqWinGrp || pCtx->groupJoin) ? pCtx->seqGrpId : 0; + pCtx->finBlk->info.id.groupId = pCtx->seqWinGrp ? pCtx->pJoin->outGrpId : 0; if (buildGrpNum <= 0 || buildTotalRows <= 0) { MJ_ERR_RET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, pCtx->seqWinGrp)); if (pCtx->seqWinGrp) { - pCtx->seqGrpId++; + pCtx->pJoin->outGrpId++; } return TSDB_CODE_SUCCESS; } @@ -90,7 +90,7 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { cache->grpIdx = 0; ++probeGrp->readIdx; if (pCtx->seqWinGrp) { - pCtx->seqGrpId++; + pCtx->pJoin->outGrpId++; break; } } @@ -462,17 +462,20 @@ static int32_t mLeftJoinMergeCart(SMJoinMergeCtx* pCtx) { -static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { - bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); +static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { + bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; do { if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { - buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); } if (!probeGot) { - mJoinSetDone(pOperator); + if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) { + mJoinSetDone(pOperator); + } + return false; } @@ -531,7 +534,11 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mLeftJoinRetrieve(pOperator, pJoin)) { + if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) { + if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) { + continue; + } + break; } @@ -578,7 +585,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { } } - if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) { + if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) { pCtx->probeNEqGrp.blk = pJoin->probe->blk; pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx; pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; @@ -603,6 +610,20 @@ _return: return pCtx->finBlk; } +void mLeftJoinGroupReset(SMJoinOperatorInfo* pJoin) { + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + + pCtx->lastEqGrp = false; + pCtx->lastProbeGrp = false; + pCtx->hashCan = false; + pCtx->midRemains = false; + pCtx->lastEqTs = INT64_MIN; + + mJoinResetGroupTableCtx(pJoin->probe); + mJoinResetGroupTableCtx(pJoin->build); +} + + static int32_t mInnerJoinMergeCart(SMJoinMergeCtx* pCtx) { int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; SMJoinTableCtx* probe = pCtx->pJoin->probe; @@ -716,11 +737,11 @@ static FORCE_INLINE int32_t mInnerJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { - bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); + bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { - buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); } if (!probeGot) { @@ -827,8 +848,8 @@ static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { } static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { - bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); - bool buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); + bool buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); if (!probeGot && !buildGot) { return false; @@ -1722,7 +1743,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mLeftJoinRetrieve(pOperator, pJoin)) { + if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) { break; } @@ -1868,14 +1889,13 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow eqRowsNum += grp.endIdx - grp.beginIdx + 1; if (pTable->blkRowIdx == pTable->blk->info.rows && !pTable->dsFetchDone) { - pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx); + pTable->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pTable); qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); pTable->blkRowIdx = 0; pCtx->buildGrp.blk = pTable->blk; if (NULL == pTable->blk) { - pTable->dsFetchDone = true; break; } } else { @@ -2051,16 +2071,19 @@ int32_t mAsofLowerHandleGrpRemains(SMJoinWindowCtx* pCtx) { } static bool mAsofLowerRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { - bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); + bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; do { if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { - buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); } if (!probeGot) { - mJoinSetDone(pOperator); + if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) { + mJoinSetDone(pOperator); + } + return false; } @@ -2100,6 +2123,10 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { do { if (!mAsofLowerRetrieve(pOperator, pJoin, pCtx)) { + if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) { + continue; + } + break; } @@ -2149,7 +2176,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { } } - if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) { + if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) { pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1; @@ -2176,6 +2203,10 @@ _return: } int32_t mAsofGreaterTrimCacheBlk(SMJoinWindowCtx* pCtx) { + if (taosArrayGetSize(pCtx->cache.grps) <= 0) { + return TSDB_CODE_SUCCESS; + } + SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0); if (pGrp->blk == pCtx->cache.outBlk && pCtx->pJoin->build->blkRowIdx > 0) { MJ_ERR_RET(blockDataTrimFirstRows(pGrp->blk, pCtx->pJoin->build->blkRowIdx)); @@ -2197,40 +2228,38 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) { SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinWinCache* pCache = &pCtx->cache; int32_t grpNum = taosArrayGetSize(pCache->grps); - ASSERT(grpNum >= 1 && grpNum <= 2); - - SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1); - if (pGrp->blk != pCache->outBlk) { - int32_t beginIdx = (1 == grpNum) ? build->blkRowIdx : 0; - MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pGrp->blk, beginIdx, pGrp->blk->info.rows - beginIdx)); - if (1 == grpNum) { - pGrp->blk = pCache->outBlk; - pGrp->beginIdx = 0; - pGrp->readIdx = 0; - //pGrp->endIdx = pGrp->blk->info.rows - 1; - } else { - taosArrayPop(pCache->grps); - pGrp = taosArrayGet(pCache->grps, 0); - ASSERT(pGrp->blk == pCache->outBlk); - //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx; + if (grpNum >= 1) { + SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1); + if (pGrp->blk != pCache->outBlk) { + int32_t beginIdx = (1 == grpNum) ? build->blkRowIdx : 0; + MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pGrp->blk, beginIdx, pGrp->blk->info.rows - beginIdx)); + if (1 == grpNum) { + pGrp->blk = pCache->outBlk; + pGrp->beginIdx = 0; + pGrp->readIdx = 0; + //pGrp->endIdx = pGrp->blk->info.rows - 1; + } else { + taosArrayPop(pCache->grps); + pGrp = taosArrayGet(pCache->grps, 0); + ASSERT(pGrp->blk == pCache->outBlk); + //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx; + } + + //ASSERT((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum); } - - //ASSERT((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum); - } else { - ASSERT(grpNum == 1); - } - ASSERT(taosArrayGetSize(pCache->grps) == 1); - ASSERT(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum); + + ASSERT(taosArrayGetSize(pCache->grps) == 1); + ASSERT(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum); + } do { - build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, build->downStreamIdx); + build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, build); qDebug("%s merge join %s table got block to fill grp, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0); build->blkRowIdx = 0; if (NULL == build->blk) { - build->dsFetchDone = true; break; } @@ -2337,13 +2366,12 @@ int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) { return TSDB_CODE_SUCCESS; } - pTable->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pTable->downStreamIdx); + pTable->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pTable); qDebug("%s merge join %s table got block to skip eq ts, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); pTable->blkRowIdx = 0; if (NULL == pTable->blk) { - pTable->dsFetchDone = true; return TSDB_CODE_SUCCESS; } @@ -2420,7 +2448,7 @@ int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* } static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { - bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); + bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; do { @@ -2428,11 +2456,14 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p pJoin->build->newBlk = false; MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build); ASSERT(taosArrayGetSize(pCtx->cache.grps) <= 1); - buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); } if (!probeGot) { - mJoinSetDone(pOperator); + if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) { + mJoinSetDone(pOperator); + } + return false; } @@ -2486,6 +2517,10 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { do { if (!mAsofGreaterRetrieve(pOperator, pJoin, pCtx)) { + if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) { + continue; + } + break; } @@ -2529,7 +2564,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { } } - if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) { + if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) { pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1; @@ -2554,6 +2589,34 @@ _return: return pCtx->finBlk; } +void mAsofJoinGroupReset(SMJoinOperatorInfo* pJoin) { + SMJoinWindowCtx* pWin = &pJoin->ctx.windowCtx; + SMJoinWinCache* pCache = &pWin->cache; + + pWin->lastEqGrp = false; + pWin->lastProbeGrp = false; + pWin->eqPostDone = false; + pWin->lastTs = INT64_MIN; + + pCache->outRowIdx = 0; + pCache->rowNum = 0; + pCache->grpIdx = 0; + + if (pCache->grpsQueue) { + TSWAP(pCache->grps, pCache->grpsQueue); + } + + taosArrayClear(pCache->grps); + + if (pCache->outBlk) { + blockDataCleanup(pCache->outBlk); + } + + mJoinResetGroupTableCtx(pJoin->probe); + mJoinResetGroupTableCtx(pJoin->build); +} + + static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) { SMJoinWinCache* pCache = &pCtx->cache; int32_t grpNum = taosArrayGetSize(pCache->grps); @@ -2579,7 +2642,7 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) { } static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { - bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); + bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; do { @@ -2588,11 +2651,14 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin mWinJoinCloneCacheBlk(pCtx); } - buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); } if (!probeGot) { - mJoinSetDone(pOperator); + if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) { + mJoinSetDone(pOperator); + } + return false; } @@ -2766,13 +2832,12 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { } do { - build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pCtx->pJoin->build->downStreamIdx); + build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pCtx->pJoin->build); qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0); build->blkRowIdx = 0; if (NULL == build->blk) { - build->dsFetchDone = true; break; } @@ -2844,13 +2909,12 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) { do { MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx)); - build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pCtx->pJoin->build->downStreamIdx); + build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pCtx->pJoin->build); qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0); build->blkRowIdx = 0; if (NULL == build->blk) { - build->dsFetchDone = true; break; } @@ -2892,6 +2956,10 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { do { if (!mWinJoinRetrieve(pOperator, pJoin, pCtx)) { + if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) { + continue; + } + break; } @@ -2927,6 +2995,33 @@ _return: return pCtx->finBlk; } +void mWinJoinGroupReset(SMJoinOperatorInfo* pJoin) { + SMJoinWindowCtx* pWin = &pJoin->ctx.windowCtx; + SMJoinWinCache* pCache = &pWin->cache; + + pWin->lastEqGrp = false; + pWin->lastProbeGrp = false; + pWin->eqPostDone = false; + pWin->lastTs = INT64_MIN; + + pCache->outRowIdx = 0; + pCache->rowNum = 0; + pCache->grpIdx = 0; + + if (pCache->grpsQueue) { + TSWAP(pCache->grps, pCache->grpsQueue); + } + + taosArrayClear(pCache->grps); + + if (pCache->outBlk) { + blockDataCleanup(pCache->outBlk); + } + + mJoinResetGroupTableCtx(pJoin->probe); + mJoinResetGroupTableCtx(pJoin->build); +} + int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT; pCache->colNum = pJoin->build->finNum; @@ -2955,9 +3050,8 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p pCtx->pJoin = pJoin; pCtx->lastTs = INT64_MIN; pCtx->seqWinGrp = pJoinNode->seqWinGroup; - pCtx->groupJoin = pJoinNode->grpJoin; if (pCtx->seqWinGrp) { - pCtx->seqGrpId = 1; + pJoin->outGrpId = 1; } switch (pJoinNode->subType) { @@ -2973,6 +3067,7 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p } else if (pCtx->greaterRowsAcq) { pJoin->joinFp = mAsofGreaterJoinDo; } + pJoin->grpResetFp = mAsofJoinGroupReset; break; case JOIN_STYPE_WIN: { SWindowOffsetNode* pOffsetNode = (SWindowOffsetNode*)pJoinNode->pWindowOffset; @@ -3023,6 +3118,7 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; pJoin->subType = JOIN_STYPE_OUTER; pJoin->build->eqRowLimit = pCtx->jLimit; + pJoin->grpResetFp = mLeftJoinGroupReset; } else { pCtx->jLimit = -1; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 4ac708504d..32cd4b113c 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -591,7 +591,7 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true); if (!lastBuildGrp) { - mJoinRetrieveEqGrpRows(pJoin->pOperator, pJoin->build, timestamp); + mJoinRetrieveEqGrpRows(pJoin, pJoin->build, timestamp); } else { pJoin->build->grpIdx = 0; } @@ -846,8 +846,92 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin pInfo->probe->type = E_JOIN_TB_PROBE; } +SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) { + int32_t dsIdx = pTable->downStreamIdx; + if (E_JOIN_TB_PROBE == pTable->type) { + if (pTable->remainInBlk) { + SSDataBlock* pTmp = pTable->remainInBlk; + pTable->remainInBlk = NULL; + (*pJoin->grpResetFp)(pJoin); + pTable->lastInGid = pTmp->info.id.groupId; + return pTmp; + } + + if (pTable->dsFetchDone) { + return NULL; + } + + SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx); + if (NULL == pTmp) { + pTable->dsFetchDone = true; + return NULL; + } + + if (0 == pTable->lastInGid) { + pTable->lastInGid = pTmp->info.id.groupId; + return pTmp; + } + + if (pTable->lastInGid == pTmp->info.id.groupId) { + return pTmp; + } + + pTable->remainInBlk = pTmp; + return NULL; + } + + SMJoinTableCtx* pProbe = pJoin->probe; + ASSERT(pProbe->lastInGid); + + while (true) { + if (pTable->remainInBlk) { + if (pTable->remainInBlk->info.id.groupId == pProbe->lastInGid) { + SSDataBlock* pTmp = pTable->remainInBlk; + pTable->remainInBlk = NULL; + pTable->lastInGid = pTmp->info.id.groupId; + return pTmp; + } + + if (pTable->remainInBlk->info.id.groupId > pProbe->lastInGid) { + return NULL; + } + + pTable->remainInBlk = NULL; + } + + if (pTable->dsFetchDone) { + return NULL; + } + + SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx); + if (NULL == pTmp) { + pTable->dsFetchDone = true; + return NULL; + } + + pTable->remainInBlk = pTmp; + } + + return NULL; +} + +static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) { + if (pTable->dsFetchDone) { + return NULL; + } + + SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTable->downStreamIdx); + if (NULL == pTmp) { + pTable->dsFetchDone = true; + } + + return pTmp; +} + + static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { - pJoin->retrieveCtx.grpRetrieve = pJoinNode->grpJoin; + pJoin->ctx.mergeCtx.groupJoin = pJoinNode->grpJoin; + pJoin->retrieveFp = pJoinNode->grpJoin ? mJoinGrpRetrieveImpl : mJoinRetrieveImpl; if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType))) || (JOIN_STYPE_WIN == pJoin->subType)) { @@ -865,6 +949,10 @@ static void mJoinDestroyCtx(SMJoinOperatorInfo* pJoin) { return mJoinDestroyMergeCtx(pJoin); } +bool mJoinIsDone(SOperatorInfo* pOperator) { + return (OP_EXEC_DONE == pOperator->status); +} + void mJoinSetDone(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); if (pOperator->pDownstreamGetParams) { @@ -875,21 +963,15 @@ void mJoinSetDone(SOperatorInfo* pOperator) { } } -bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) { - if (pTb->dsFetchDone) { - return (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) ? false : true; - } - +bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) { if (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) { - (*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTb->downStreamIdx); + (*ppBlk) = (*pJoin->retrieveFp)(pJoin, pTb); pTb->dsInitDone = true; qDebug("%s merge join %s table got %" PRId64 " rows block", GET_TASKID(pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0); *pIdx = 0; - if (NULL == (*ppBlk)) { - pTb->dsFetchDone = true; - } else { + if (NULL != (*ppBlk)) { pTb->newBlk = true; } @@ -1046,19 +1128,18 @@ _return: } -int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp) { +int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable, int64_t timestamp) { bool wholeBlk = false; mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true); while (wholeBlk && !pTable->dsFetchDone) { - pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx); - qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); + pTable->blk = (*pJoin->retrieveFp)(pJoin, pTable); + qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); pTable->blkRowIdx = 0; if (NULL == pTable->blk) { - pTable->dsFetchDone = true; break; } @@ -1238,13 +1319,23 @@ int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable return TSDB_CODE_SUCCESS; } +void mJoinResetGroupTableCtx(SMJoinTableCtx* pCtx) { + pCtx->blk = NULL; + pCtx->blkRowIdx = 0; + pCtx->newBlk = false; + + mJoinDestroyCreatedBlks(pCtx->createdBlks); + tSimpleHashClear(pCtx->pGrpHash); +} + void mJoinResetTableCtx(SMJoinTableCtx* pCtx) { pCtx->dsInitDone = false; pCtx->dsFetchDone = false; + pCtx->lastInGid = 0; + pCtx->remainInBlk = NULL; - mJoinDestroyCreatedBlks(pCtx->createdBlks); - tSimpleHashClear(pCtx->pGrpHash); + mJoinResetGroupTableCtx(pCtx); } void mJoinResetMergeCtx(SMJoinMergeCtx* pCtx) { @@ -1413,6 +1504,7 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) { break; case JOIN_STYPE_WIN: pJoin->joinFp = mWinJoinDo; + pJoin->grpResetFp = mWinJoinGroupReset; break; default: break; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 42c36078ed..8d435806eb 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -485,6 +485,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { CLONE_NODE_FIELD(pFullOnCond); CLONE_NODE_LIST_FIELD(pLeftEqNodes); CLONE_NODE_LIST_FIELD(pRightEqNodes); + COPY_SCALAR_FIELD(allEqTags); COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(hasSubQuery); COPY_SCALAR_FIELD(seqWinGroup); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 73fbbc6b5a..f6d35daad8 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -852,8 +852,7 @@ static int32_t pdcJoinSplitPrimEqCond(SOptimizeContext* pCxt, SJoinLogicNode* pJ pPrimKeyEqCond = pJoin->pFullOnCond; pJoinOnCond = NULL; } else { - planError("unexcepted conds in fullOnCond, type:%s", nodesNodeName(nodeType(pJoin->pFullOnCond))); - code = TSDB_CODE_PLAN_INTERNAL_ERROR; + return TSDB_CODE_SUCCESS; } if (TSDB_CODE_SUCCESS == code) { @@ -924,11 +923,13 @@ static int32_t pdcJoinPartLogicEqualOnCond(SJoinLogicNode* pJoin) { code = nodesListMakeAppend(&pTagEqOnConds, nodesCloneNode(pCond)); } else { code = nodesListMakeAppend(&pColEqOnConds, nodesCloneNode(pCond)); + pJoin->allEqTags = false; } } else if (allTags) { code = nodesListMakeAppend(&pTagOnConds, nodesCloneNode(pCond)); } else { code = nodesListMakeAppend(&pColOnConds, nodesCloneNode(pCond)); + pJoin->allEqTags = false; } if (code) { @@ -978,6 +979,9 @@ static int32_t pdcJoinPartEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJ pJoin->pTagEqCond = NULL; return TSDB_CODE_SUCCESS; } + + pJoin->allEqTags = true; + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pFullOnCond) && LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pFullOnCond))->condType) { return pdcJoinPartLogicEqualOnCond(pJoin); @@ -989,11 +993,13 @@ static int32_t pdcJoinPartEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJ pJoin->pTagEqCond = nodesCloneNode(pJoin->pFullOnCond); } else { pJoin->pColEqCond = nodesCloneNode(pJoin->pFullOnCond); + pJoin->allEqTags = false; } } else if (allTags) { pJoin->pTagOnCond = nodesCloneNode(pJoin->pFullOnCond); } else { pJoin->pColOnCond = nodesCloneNode(pJoin->pFullOnCond); + pJoin->allEqTags = false; } return TSDB_CODE_SUCCESS; @@ -1274,7 +1280,7 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { } } - if (TSDB_CODE_SUCCESS == code && NULL != pJoin->pFullOnCond) { + if (TSDB_CODE_SUCCESS == code && NULL != pJoin->pFullOnCond && !IS_WINDOW_JOIN(pJoin->subType)) { code = pdcJoinSplitPrimEqCond(pCxt, pJoin); } @@ -4540,10 +4546,37 @@ static int32_t grpJoinOptInsertPartitionNode(SLogicNode* pJoin) { return code; } -static int32_t grpJoinOptRewriteGroupJoin(SOptimizeContext* pCxt, SLogicNode* pJoin, SLogicSubplan* pLogicSubplan) { - int32_t code = grpJoinOptInsertPartitionNode(pJoin); +static int32_t grpJoinOptPartByTags(SLogicNode* pNode) { + int32_t code = TSDB_CODE_SUCCESS; + SNode* pChild = NULL; + SNode* pNew = NULL; + bool leftChild = true; + SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; + FOREACH(pChild, pNode->pChildren) { + if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) { + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + SScanLogicNode* pScan = (SScanLogicNode*)pChild; + if (leftChild) { + nodesListMakeStrictAppendList(&pScan->pGroupTags, nodesCloneList(pJoin->pLeftEqNodes)); + leftChild = false; + } else { + nodesListMakeStrictAppendList(&pScan->pGroupTags, nodesCloneList(pJoin->pRightEqNodes)); + } + + pScan->groupSort = true; + pScan->groupOrderScan = true; + } + + return code; +} + +static int32_t grpJoinOptRewriteGroupJoin(SOptimizeContext* pCxt, SLogicNode* pNode, SLogicSubplan* pLogicSubplan) { + SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; + int32_t code = (pJoin->allEqTags && !pJoin->hasSubQuery) ? grpJoinOptPartByTags(pNode) : grpJoinOptInsertPartitionNode(pNode); if (TSDB_CODE_SUCCESS == code) { - ((SJoinLogicNode*)pJoin)->grpJoin = true; + pJoin->grpJoin = true; pCxt->optimized = true; } return code; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 6746829f7e..016b6054f6 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1405,7 +1405,7 @@ static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubp //if (pJoin->node.dynamicOp) { // code = TSDB_CODE_SUCCESS; //} else { - code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, false); + code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, pJoin->grpJoin ? true : false); //} } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) { code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild); diff --git a/tests/script/tsim/join/left_asof_join.sim b/tests/script/tsim/join/left_asof_join.sim index 7c676a8a39..57ad8ca922 100644 --- a/tests/script/tsim/join/left_asof_join.sim +++ b/tests/script/tsim/join/left_asof_join.sim @@ -477,8 +477,437 @@ if $data61 != @23-11-17 16:29:03.000@ then return -1 endi +sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts and a.col1=b.col1 jlimit 2 order by a.ts +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != NULL then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:03.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts > b.ts and a.col1=b.col1 jlimit 2 order by a.ts +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data11 != NULL then + return -1 +endi +if $data20 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data21 != NULL then + return -1 +endi +if $data30 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data41 != NULL then + return -1 +endi +if $data50 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data51 != NULL then + return -1 +endi +if $data60 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data61 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data70 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data71 != NULL then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts >= b.ts and a.col1=b.col1 jlimit 2 order by a.ts, b.ts; +if $rows != 10 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data41 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data61 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data70 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data71 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data80 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data81 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data90 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data91 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts < b.ts and a.col1=b.col1 jlimit 2 order by a.ts, b.ts +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data11 != NULL then + return -1 +endi +if $data20 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data31 != NULL then + return -1 +endi +if $data40 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data41 != NULL then + return -1 +endi +if $data50 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data61 != NULL then + return -1 +endi +if $data70 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data71 != NULL then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.col1=b.col1 jlimit 2 order by a.ts, b.ts +if $rows != 10 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data41 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data61 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data70 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data71 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data80 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data81 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data90 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data91 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts = b.ts and a.col1=b.col1 jlimit 2 order by a.ts, b.ts +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data41 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data61 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data70 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data71 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.t1, a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 jlimit 2 order by a.t1, a.ts, b.ts; +if $rows != 14 then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 order by a.t1, a.ts, b.ts; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data41 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data61 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data70 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data71 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 where a.ts > '2023-11-17 16:29:00.000' order by a.t1, a.ts, b.ts; +if $rows != 6 then + return -1 +endi +if $data00 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data41 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select count(*) from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 where a.ts > '2023-11-17 16:29:00.000'; +if $rows != 1 then + return -1 +endi +if $data00 != 6 then + return -1 +endi + sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1=a.ts; sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1 > 1; +sql_error select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts and a.col1 > b.col1 jlimit 2 order by a.ts; +sql_error select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts and a.col1 = 1 jlimit 2 order by a.ts; +sql_error select a.t1, a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 having(a.ts>0) order by a.t1, a.ts, b.ts; +sql_error select count(*) from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 where a.ts > '2023-11-17 16:29:00.000' slimit 1; diff --git a/tests/script/tsim/join/left_win_join.sim b/tests/script/tsim/join/left_win_join.sim index 8ddc7b681e..3d9932a69c 100644 --- a/tests/script/tsim/join/left_win_join.sim +++ b/tests/script/tsim/join/left_win_join.sim @@ -384,6 +384,194 @@ if $data21 != 4 then return -1 endi +sql select a.ts, b.ts from sta a left window join sta b on a.col1=b.col1 window_offset(-1s, 1s) order by a.col1, a.ts; +if $rows != 12 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:01.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left window join sta b on a.t1=b.t1 window_offset(-1s, 1s) order by a.t1, a.ts, b.ts; +if $rows != 14 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data41 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data61 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data70 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data71 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data80 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data81 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data90 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data91 != @23-11-17 16:29:01.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left window join sta b on a.t1=b.t1 and a.col1=b.col1 window_offset(-1s, 1s) order by a.t1, a.ts, b.ts; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data41 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data61 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data70 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data71 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from sta a left window join sta b on a.t1=b.t1 and a.col1=b.col1 window_offset(-2s, -1s) order by a.t1, a.ts, b.ts; +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != NULL then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != NULL then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != NULL then + return -1 +endi +if $data40 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data41 != NULL then + return -1 +endi +if $data50 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data51 != NULL then + return -1 +endi +if $data60 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data61 != NULL then + return -1 +endi +if $data70 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data71 != NULL then + return -1 +endi + sql_error select a.col1, count(*) from sta a left window join sta b window_offset(-1s, 1s); sql_error select b.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s); sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(b.ts > 0); @@ -392,3 +580,4 @@ sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(a.ts > "2023-11-17 16:29:00.000"); sql_error select a.ts from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 having(a.ts > 0) order by count(*); sql_error select a.ts, count(*),last(b.ts) from sta a left window join sta b window_offset(-1s, 1s) slimit 1; +sql_error select a.ts, b.ts from sta a left window join sta b on a.t1=1 window_offset(-1s, 1s) order by a.t1, a.ts;