From 3fecb387cc9aa5990b4c7bf6a79dcc22ef7c0336 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 19 Jan 2024 18:03:16 +0800 Subject: [PATCH] enh: support asof join --- source/libs/executor/inc/mergejoin.h | 14 + source/libs/executor/src/mergejoin.c | 237 +++++---- source/libs/executor/src/mergejoinoperator.c | 16 +- source/libs/executor/test/joinTests.cpp | 93 +++- source/libs/nodes/src/nodesCodeFuncs.c | 21 + source/libs/nodes/src/nodesMsgFuncs.c | 21 + source/libs/planner/src/planOptimizer.c | 2 +- tests/script/tsim/join/inner_join.sim | 1 + tests/script/tsim/join/join.sim | 2 + tests/script/tsim/join/left_asof_join.sim | 488 +++++++++++++++++++ 10 files changed, 776 insertions(+), 119 deletions(-) create mode 100644 tests/script/tsim/join/left_asof_join.sim diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index e8653c3022..a8e28d56b9 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -318,6 +318,7 @@ typedef struct SMJoinOperatorInfo { (_cache)->rowNum += (_blk)->info.rows; \ pGrp->blk = (_blk); \ pGrp->beginIdx = 0; \ + pGrp->endIdx = (_blk)->info.rows - 1; \ } while (0) #define MJOIN_RESTORE_TB_BLK(_cache, _tb) \ @@ -332,6 +333,15 @@ typedef struct SMJoinOperatorInfo { } \ } while (0) +#define MJOIN_SAVE_TB_BLK(_cache, _tb) \ + do { \ + ASSERT(taosArrayGetSize((_cache)->grps) >= 1); \ + SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \ + ASSERT(pGrp->blk == (_tb)->blk); \ + pGrp->beginIdx = (_tb)->blkRowIdx; \ + pGrp->readIdx = pGrp->beginIdx; \ + } while (0) + #define MJOIN_POP_TB_BLK(_cache) \ do { \ SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \ @@ -378,6 +388,9 @@ typedef struct SMJoinOperatorInfo { } while (0) + +void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin); +void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin); int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode); int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode); SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator); @@ -408,6 +421,7 @@ int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo); int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index f6a5307c06..5c98e0cab9 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -1714,30 +1714,42 @@ _return: } -static bool mAsofJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { - bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); - bool buildGot = false; +int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) { + SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); - do { - if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { - buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { + return TSDB_CODE_SUCCESS; + } + + pGrp->beginIdx = pTable->blkRowIdx; + pGrp->readIdx = pTable->blkRowIdx; + + pTable->blkRowIdx++; + char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1); + if (timestamp != *(int64_t*)pEndVal) { + for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { + char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx); + if (timestamp == *(int64_t*)pNextVal) { + continue; + } + + pGrp->endIdx = pTable->blkRowIdx - 1; + return TSDB_CODE_SUCCESS; } - - if (!probeGot) { - mJoinSetDone(pOperator); - return false; - } - - break; - } while (true); + } - pCtx->probeGrp.blk = pJoin->probe->blk; - pCtx->buildGrp.blk = pJoin->build->blk; + pGrp->endIdx = pTable->blk->info.rows - 1; + pTable->blkRowIdx = pTable->blk->info.rows; - return true; + if (wholeBlk) { + *wholeBlk = true; + } + + return TSDB_CODE_SUCCESS; } -int32_t mAsofJoinCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) { + +int32_t mAsofLowerCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) { if (pCache->outBlk->info.rows <= 0) { *evictRows = 0; return TMIN(jLimit, newRows); @@ -1757,10 +1769,10 @@ int32_t mAsofJoinCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newR return newRows; } -int32_t mAsofJoinAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool fromBegin) { +int32_t mAsofLowerAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool fromBegin) { int32_t evictRows = 0; SMJoinWinCache* pCache = &pCtx->cache; - int32_t rows = mAsofJoinCalcRowNum(pCache, pCtx->jLimit, pGrp->endIdx - pGrp->beginIdx + 1, &evictRows); + int32_t rows = mAsofLowerCalcRowNum(pCache, pCtx->jLimit, pGrp->endIdx - pGrp->beginIdx + 1, &evictRows); if (evictRows > 0) { MJ_ERR_RET(blockDataTrimFirstRows(pCache->outBlk, evictRows)); } @@ -1769,40 +1781,8 @@ int32_t mAsofJoinAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool return blockDataMergeNRows(pCache->outBlk, pGrp->blk, startIdx, rows); } -int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) { - SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); - if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { - return TSDB_CODE_SUCCESS; - } - - pGrp->beginIdx = pTable->blkRowIdx; - pGrp->readIdx = pTable->blkRowIdx; - - char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1); - if (timestamp != *(int64_t*)pEndVal) { - for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { - char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx); - if (timestamp == *(int64_t*)pNextVal) { - continue; - } - - return TSDB_CODE_SUCCESS; - } - } - - pGrp->endIdx = pTable->blk->info.rows - 1; - pTable->blkRowIdx = pTable->blk->info.rows; - - if (wholeBlk) { - *wholeBlk = true; - } - - return TSDB_CODE_SUCCESS; -} - - -int32_t mAsofJoinAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) { +int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) { int64_t eqRowsNum = 0; SMJoinGrpRows grp = {.blk = pTable->blk}; @@ -1833,7 +1813,7 @@ int32_t mAsofJoinAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowC } if (eqRowsNum < pCtx->jLimit) { - MJ_ERR_RET(mAsofJoinAddRowsToCache(pCtx, &grp, false)); + MJ_ERR_RET(mAsofLowerAddRowsToCache(pCtx, &grp, false)); } eqRowsNum += grp.endIdx - grp.beginIdx + 1; @@ -1874,6 +1854,7 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp)); pCtx->grpRemains = false; + pCtx->cache.outRowIdx = 0; return TSDB_CODE_SUCCESS; } @@ -1886,6 +1867,7 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { } pCtx->grpRemains = false; + pCtx->cache.outRowIdx = 0; return TSDB_CODE_SUCCESS; } @@ -1897,8 +1879,8 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { buildGrp.readIdx = 0; MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp)); rowsLeft -= grpNum * pCtx->cache.outBlk->info.rows; - pCtx->cache.outRowIdx = 0; probeGrp->readIdx += grpNum; + probeGrp->endIdx = probeEndIdx; continue; } } @@ -1935,7 +1917,7 @@ int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJ if (!pCtx->eqPostDone && !lastBuildGrp) { pCtx->eqPostDone = true; - return mAsofJoinAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs); + return mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs); } return TSDB_CODE_SUCCESS; @@ -1943,7 +1925,7 @@ int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJ if (!pCtx->eqPostDone && !lastBuildGrp) { pCtx->eqPostDone = true; - MJ_ERR_RET(mAsofJoinAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs)); + MJ_ERR_RET(mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs)); } return mAsofLowerDumpGrpCache(pCtx); @@ -1953,8 +1935,10 @@ int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool SMJoinOperatorInfo* pJoin = pCtx->pJoin; pCtx->lastEqGrp = true; - pCtx->eqPostDone = false; - + if (!lastBuildGrp) { + pCtx->eqPostDone = false; + } + MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp)); return mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp); @@ -2002,9 +1986,7 @@ int32_t mAsofLowerProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx; - MJ_ERR_RET(mAsofJoinAddRowsToCache(pCtx, &pCtx->buildGrp, false)); - - return mAsofLowerDumpGrpCache(pCtx); + return mAsofLowerAddRowsToCache(pCtx, &pCtx->buildGrp, false); } int32_t mAsofLowerHandleGrpRemains(SMJoinWindowCtx* pCtx) { @@ -2028,6 +2010,11 @@ static bool mAsofLowerRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJo break; } while (true); + if (buildGot && NULL == pCtx->cache.outBlk) { + pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); + blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit); + } + pCtx->probeGrp.blk = pJoin->probe->blk; pCtx->buildGrp.blk = pJoin->build->blk; @@ -2147,6 +2134,7 @@ int32_t mAsofGreaterDumpGrpCache(SMJoinWindowCtx* pCtx) { buildGrp->readIdx = buildGrp->beginIdx; } + cache->grpIdx = 0; pCtx->grpRemains = false; return TSDB_CODE_SUCCESS; } @@ -2203,18 +2191,26 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) { int32_t grpNum = taosArrayGetSize(pCache->grps); ASSERT(grpNum >= 1 && grpNum <= 2); - SSDataBlock* pBlk = NULL; SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1); if (pGrp->blk != pCache->outBlk) { - pBlk = pGrp->blk; - taosArrayPop(pCache->grps); + int32_t beginIdx = (1 == grpNum) ? build->blkRowIdx : 0; + MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pGrp->blk, beginIdx, pGrp->blk->info.rows - beginIdx)); + if (1 == grpNum) { + pGrp->blk = pCache->outBlk; + pGrp->beginIdx = 0; + pGrp->readIdx = 0; + //pGrp->endIdx = pGrp->blk->info.rows - 1; + } else { + taosArrayPop(pCache->grps); + pGrp = taosArrayGet(pCache->grps, 0); + ASSERT(pGrp->blk == pCache->outBlk); + //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx; + } + + //ASSERT((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum); } do { - if (NULL != pBlk) { - MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pBlk, build->blkRowIdx, pBlk->info.rows - build->blkRowIdx)); - } - build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, build->downStreamIdx); qDebug("%s merge join %s table got block to fill grp, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0); @@ -2225,8 +2221,14 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) { break; } - MJOIN_PUSH_BLK_TO_CACHE(pCache, build->blk); - pBlk = build->blk; + if ((pCache->rowNum + build->blk->info.rows) >= pCtx->jLimit) { + MJOIN_PUSH_BLK_TO_CACHE(pCache, build->blk); + break; + } + + MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, build->blk, 0, build->blk->info.rows)); + + //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx; } while (pCache->rowNum < pCtx->jLimit); MJOIN_RESTORE_TB_BLK(pCache, build); @@ -2234,10 +2236,35 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) { return TSDB_CODE_SUCCESS; } +void mAsofGreaterUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) { + int32_t grpNum = taosArrayGetSize(pCtx->cache.grps); + if (grpNum <= 0) { + return; + } + + SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0); + if (1 == grpNum) { + pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows - pGrp->beginIdx, pCtx->jLimit) - 1; + return; + } + + ASSERT(pCtx->jLimit > (pGrp->blk->info.rows - pGrp->beginIdx)); + pGrp->endIdx = pGrp->blk->info.rows - 1; + + int64_t remainRows = pCtx->jLimit - (pGrp->endIdx - pGrp->beginIdx + 1); + + pGrp = taosArrayGet(pCtx->cache.grps, 1); + pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows, remainRows) - 1; +} + int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) { if (!lastBuildGrp) { MJ_ERR_RET(mAsofGreaterChkFillGrpCache(pCtx)); } + + MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build); + + mAsofGreaterUpdateBuildGrpEndIdx(pCtx); return mAsofGreaterDumpGrpCache(pCtx); } @@ -2249,7 +2276,8 @@ int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool* *wholeBlk = false; return TSDB_CODE_SUCCESS; } - + + pTable->blkRowIdx++; char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1); if (timestamp != *(int64_t*)pEndVal) { for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { @@ -2257,7 +2285,8 @@ int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool* if (timestamp == *(int64_t*)pNextVal) { continue; } - + + *wholeBlk = false; return TSDB_CODE_SUCCESS; } } @@ -2269,7 +2298,6 @@ int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool* int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) { SMJoinWinCache* cache = &pCtx->cache; - int32_t grpNum = taosArrayGetSize(cache->grps); SMJoinTableCtx* pTable = pCtx->pJoin->build; bool wholeBlk = false; @@ -2314,7 +2342,6 @@ int32_t mAsofGreaterProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bo SMJoinOperatorInfo* pJoin = pCtx->pJoin; pCtx->lastEqGrp = true; - pCtx->cache.grpIdx = 0; MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp)); @@ -2338,8 +2365,6 @@ int32_t mAsofGreaterProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p break; } - pCtx->cache.grpIdx = 0; - return mAsofGreaterFillDumpGrpCache(pCtx, false); } @@ -2382,7 +2407,7 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p return false; } - if (buildGot && pCtx->asofGreaterRow) { + if (buildGot) { SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { @@ -2395,22 +2420,21 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p } while (true); if (buildGot) { + if (NULL == pCtx->cache.outBlk) { + pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); + blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit); + } + MJOIN_PUSH_BLK_TO_CACHE(&pCtx->cache, pJoin->build->blk); MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build); } pCtx->probeGrp.blk = pJoin->probe->blk; - pCtx->buildGrp.blk = pJoin->build->blk; return true; } -int32_t mAsofGreaterHandleGrpRemains(SMJoinWindowCtx* pCtx) { - return mAsofGreaterDumpGrpCache(pCtx); -} - - SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; @@ -2423,7 +2447,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { blockDataCleanup(pCtx->finBlk); if (pCtx->grpRemains) { - MJ_ERR_JRET(mAsofGreaterHandleGrpRemains(pCtx)); + MJ_ERR_JRET(mAsofGreaterDumpGrpCache(pCtx)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -2501,33 +2525,36 @@ _return: } -int32_t mJoinInitWinCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { +int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT; - pCache->colNum = pJoin->build->finNum; - pCache->outBlk = createOneDataBlock(pCtx->finBlk, false); - if (NULL == pCache->outBlk) { + + pCache->grps = taosArrayInit(2, sizeof(SMJoinGrpRows)); + if (NULL == pCache->grps) { return TSDB_CODE_OUT_OF_MEMORY; } - pCache->outBlk->info.capacity = pCtx->jLimit; - - SMJoinTableCtx* build = pJoin->build; - for (int32_t i = 0; i < pCache->colNum; ++i) { - SColumnInfoData* pCol = taosArrayGet(pCache->outBlk->pDataBlock, build->finCols[i].dstSlot); - doEnsureCapacity(pCol, NULL, pCtx->jLimit, false); - } - + //taosArrayReserve(pTable->eqGrps, 1); + return TSDB_CODE_SUCCESS; } +void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin) { + SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; + + pCtx->finBlk = blockDataDestroy(pCtx->finBlk); + pCtx->cache.outBlk = blockDataDestroy(pCtx->cache.outBlk); + + taosArrayDestroy(pCtx->cache.grps); +} + int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; pCtx->pJoin = pJoin; pCtx->asofOpType = pJoinNode->asofOpType; pCtx->asofEqRow = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType); - pCtx->asofLowerRow = ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType); - pCtx->asofGreaterRow = ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType); + pCtx->asofLowerRow = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType); + pCtx->asofGreaterRow = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType); pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; if (pCtx->asofLowerRow) { @@ -2545,18 +2572,26 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.9; - MJ_ERR_RET(mJoinInitWinCache(&pCtx->cache, pJoin, pCtx)); + MJ_ERR_RET(mJoinInitWindowCache(&pCtx->cache, pJoin, pCtx)); return TSDB_CODE_SUCCESS; } +void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin) { + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + + pCtx->finBlk = blockDataDestroy(pCtx->finBlk); + pCtx->midBlk = blockDataDestroy(pCtx->midBlk); +} + + int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; pCtx->pJoin = pJoin; pCtx->lastEqTs = INT64_MIN; pCtx->hashCan = pJoin->probe->keyNum > 0; - if (JOIN_STYPE_ASOF == pJoin->subType) { + if (JOIN_STYPE_ASOF == pJoinNode->subType) { pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; pJoin->subType = JOIN_STYPE_OUTER; } else { diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index a028790524..c00b2a5d29 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -744,7 +744,7 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi memcpy(&pTable->inputStat, pStat, sizeof(*pStat)); pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows)); - taosArrayReserve(pTable->eqGrps, 1); + //taosArrayReserve(pTable->eqGrps, 1); if (E_JOIN_TB_BUILD == pTable->type) { pTable->createdBlks = taosArrayInit(8, POINTER_BYTES); @@ -765,7 +765,7 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi pTable->noKeepEqGrpRows = (JOIN_STYPE_ANTI == pJoin->subType && NULL == pJoin->pFPreFilter); pTable->multiEqGrpRows = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pFPreFilter); pTable->multiRowsGrp = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pPreFilter); - if (JOIN_STYPE_ASOF == pJoin->subType) { + if (JOIN_STYPE_ASOF == pJoinNode->subType) { pTable->eqRowLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; } } else { @@ -819,6 +819,14 @@ static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* return mJoinInitMergeCtx(pJoin, pJoinNode); } +static void mJoinDestroyCtx(SMJoinOperatorInfo* pJoin) { + if (JOIN_STYPE_ASOF == pJoin->subType || JOIN_STYPE_WIN == pJoin->subType) { + return mJoinDestroyWindowCtx(pJoin); + } + + return mJoinDestroyMergeCtx(pJoin); +} + void mJoinSetDone(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); if (pOperator->pDownstreamGetParams) { @@ -1290,8 +1298,8 @@ void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) { void destroyMergeJoinOperator(void* param) { SMJoinOperatorInfo* pJoin = (SMJoinOperatorInfo*)param; - pJoin->ctx.mergeCtx.finBlk = blockDataDestroy(pJoin->ctx.mergeCtx.finBlk); - pJoin->ctx.mergeCtx.midBlk = blockDataDestroy(pJoin->ctx.mergeCtx.midBlk); + + mJoinDestroyCtx(pJoin); if (pJoin->pFPreFilter != NULL) { filterFreeInfo(pJoin->pFPreFilter); diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 4eb5c41116..de5d501d04 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -77,6 +77,7 @@ enum { #define RIGHT_TABLE_COLS 0x2 #define ALL_TABLE_COLS (LEFT_TABLE_COLS | RIGHT_TABLE_COLS) +#define JT_MAX_JLIMIT 3 #define JT_KEY_SOLT_ID (MAX_SLOT_NUM - 1) int32_t jtInputColType[MAX_SLOT_NUM] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_BIGINT}; @@ -112,6 +113,7 @@ typedef struct { int32_t colCond; int32_t joinType; int32_t subType; + int64_t jLimit; int32_t leftTotalRows; int32_t rightTotalRows; @@ -175,6 +177,8 @@ typedef struct { typedef struct { EJoinType joinType; EJoinSubType subType; + int32_t asofOp; + int64_t jLimit; int32_t cond; bool filter; bool asc; @@ -773,26 +777,33 @@ void createBlockDescNode(SDataBlockDescNode** ppNode) { *ppNode = pDesc; } -SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(EJoinType type, EJoinSubType sub, int32_t cond, bool filter, bool asc) { +SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param) { SSortMergeJoinPhysiNode* p = (SSortMergeJoinPhysiNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN); - p->joinType = type; - p->subType = sub; + p->joinType = param->joinType; + p->subType = param->subType; + p->asofOpType = param->asofOp; + if (param->jLimit > 1 || taosRand() % 2) { + SLimitNode* limitNode = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT); + limitNode->limit = param->jLimit; + p->pJLimit = (SNode*)limitNode; + } p->leftPrimSlotId = 0; p->rightPrimSlotId = 0; - p->node.inputTsOrder = asc ? ORDER_ASC : ORDER_DESC; + p->node.inputTsOrder = param->asc ? ORDER_ASC : ORDER_DESC; - jtCtx.joinType = type; - jtCtx.subType = sub; - jtCtx.asc = asc; - jtCtx.leftColOnly = (JOIN_TYPE_LEFT == type && JOIN_STYPE_SEMI == sub); - jtCtx.rightColOnly = (JOIN_TYPE_RIGHT == type && JOIN_STYPE_SEMI == sub); + jtCtx.joinType = param->joinType; + jtCtx.subType = param->subType; + jtCtx.asc = param->asc; + jtCtx.jLimit = param->jLimit; + jtCtx.leftColOnly = (JOIN_TYPE_LEFT == param->joinType && JOIN_STYPE_SEMI == param->subType); + jtCtx.rightColOnly = (JOIN_TYPE_RIGHT == param->joinType && JOIN_STYPE_SEMI == param->subType); - createColCond(p, cond); - createFilterStart(p, filter); + createColCond(p, param->cond); + createFilterStart(p, param->filter); createTargetSlotList(p); createColEqCondEnd(p); createColOnCondEnd(p); - createFilterEnd(p, filter); + createFilterEnd(p, param->filter); updateColRowInfo(); createBlockDescNode(&p->node.pOutputDataBlockDesc); @@ -2197,7 +2208,7 @@ void handleTestDone() { void runSingleTest(char* caseName, SJoinTestParam* param) { bool contLoop = true; - SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param->joinType, param->subType, param->cond, param->filter, param->asc); + SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param); createDummyBlkList(200, 200, 200, 200, 20); while (contLoop) { @@ -2744,6 +2755,62 @@ TEST(leftAntiJoin, fullCondTest) { #endif #endif +#if 1 +#if 1 +TEST(leftAsofJoin, noCondGreaterThanTest) { + SJoinTestParam param; + char* caseName = "leftAntiJoin:noCondGreaterThanTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_ASOF; + param.cond = TEST_NO_COND; + param.asofOp = OP_TYPE_GREATER_THAN; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; + + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + +#if 1 +TEST(leftAsofJoin, eqCondTest) { + SJoinTestParam param; + char* caseName = "leftAntiJoin:eqCondTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_ANTI; + param.cond = TEST_EQ_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); + handleCaseEnd(); +} +#endif +#endif + int main(int argc, char** argv) { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 762d860cf0..7472bb62f3 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2119,6 +2119,9 @@ static const char* jkJoinPhysiPlanJoinType = "JoinType"; static const char* jkJoinPhysiPlanSubType = "SubType"; static const char* jkJoinPhysiPlanWinOffset = "WindowOffset"; static const char* jkJoinPhysiPlanJoinLimit = "JoinLimit"; +static const char* jkJoinPhysiPlanAsofOp = "AsofOp"; +static const char* jkJoinPhysiPlanLeftPrimExpr = "LeftPrimExpr"; +static const char* jkJoinPhysiPlanRightPrimExpr = "RightPrimExpr"; static const char* jkJoinPhysiPlanLeftPrimSlotId = "LeftPrimSlotId"; static const char* jkJoinPhysiPlanRightPrimSlotId = "RightPrimSlotId"; static const char* jkJoinPhysiPlanLeftEqCols = "LeftEqCols"; @@ -2151,6 +2154,15 @@ static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinPhysiPlanJoinLimit, nodeToJson, pNode->pJLimit); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanAsofOp, pNode->asofOpType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanLeftPrimExpr, nodeToJson, pNode->leftPrimExpr); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanRightPrimExpr, nodeToJson, pNode->rightPrimExpr); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId); } @@ -2204,6 +2216,15 @@ static int32_t jsonToPhysiMergeJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinPhysiPlanJoinLimit, &pNode->pJLimit); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanAsofOp, pNode->asofOpType, code); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanLeftPrimExpr, &pNode->leftPrimExpr); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanRightPrimExpr, &pNode->rightPrimExpr); + } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId, code); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index d13e62e1f5..0f6783eab4 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2427,6 +2427,9 @@ enum { PHY_SORT_MERGE_JOIN_CODE_SUB_TYPE, PHY_SORT_MERGE_JOIN_CODE_WINDOW_OFFSET, PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT, + PHY_SORT_MERGE_JOIN_CODE_ASOF_OP, + PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_EXPR, + PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_EXPR, PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID, PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_SLOT_ID, PHY_SORT_MERGE_JOIN_CODE_LEFT_EQ_COLS, @@ -2456,6 +2459,15 @@ static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT, nodeToMsg, pNode->pJLimit); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_ASOF_OP, pNode->asofOpType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_EXPR, nodeToMsg, pNode->leftPrimExpr); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_EXPR, nodeToMsg, pNode->rightPrimExpr); + } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID, pNode->leftPrimSlotId); } @@ -2515,6 +2527,15 @@ static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT: code = msgToNodeFromTlv(pTlv, (void**)&pNode->pJLimit); break; + case PHY_SORT_MERGE_JOIN_CODE_ASOF_OP: + code = tlvDecodeI32(pTlv, &pNode->asofOpType); + break; + case PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_EXPR: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->leftPrimExpr); + break; + case PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_EXPR: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->rightPrimExpr); + break; case PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID: code = tlvDecodeI32(pTlv, &pNode->leftPrimSlotId); break; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index e261416afe..580c03c542 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -727,7 +727,7 @@ static int32_t pdcPushDownCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild } static bool pdcJoinIsPrim(SNode* pNode, SSHashObj* pTables) { - if (QUERY_NODE_COLUMN != nodeType(pNode) || QUERY_NODE_FUNCTION != nodeType(pNode)) { + if (QUERY_NODE_COLUMN != nodeType(pNode) && QUERY_NODE_FUNCTION != nodeType(pNode)) { return false; } diff --git a/tests/script/tsim/join/inner_join.sim b/tests/script/tsim/join/inner_join.sim index 44a4bb6851..1b33291d77 100644 --- a/tests/script/tsim/join/inner_join.sim +++ b/tests/script/tsim/join/inner_join.sim @@ -9,6 +9,7 @@ if $data00 != 1 then return -1 endi if $data01 != 1 then + print $data01 return -1 endi if $data10 != 1 then diff --git a/tests/script/tsim/join/join.sim b/tests/script/tsim/join/join.sim index 745eb86752..ef5750a901 100644 --- a/tests/script/tsim/join/join.sim +++ b/tests/script/tsim/join/join.sim @@ -65,6 +65,7 @@ run tsim/join/left_semi_join.sim run tsim/join/right_semi_join.sim run tsim/join/left_anti_join.sim run tsim/join/right_anti_join.sim +run tsim/join/left_asof_join.sim print ================== restart server to commit data into disk system sh/exec.sh -n dnode1 -s stop -x SIGINT @@ -79,5 +80,6 @@ run tsim/join/left_semi_join.sim run tsim/join/right_semi_join.sim run tsim/join/left_anti_join.sim run tsim/join/right_anti_join.sim +run tsim/join/left_asof_join.sim system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/join/left_asof_join.sim b/tests/script/tsim/join/left_asof_join.sim new file mode 100644 index 0000000000..f23abdf07e --- /dev/null +++ b/tests/script/tsim/join/left_asof_join.sim @@ -0,0 +1,488 @@ +sql connect +sql use test0; + +sql_error select a.col1, b.col1 from sta a left asof join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1; +sql select a.col1, b.col1 from sta a left asof join sta b on a.ts = b.ts order by a.col1, b.col1; +if $rows != 8 then + return -1 +endi +sql select a.col1, b.col1 from sta a left asof join sta b on a.ts >= b.ts order by a.col1, b.col1; +if $rows != 8 then + return -1 +endi +sql select a.col1, b.col1 from sta a left asof join sta b on a.ts = b.ts where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1; +if $rows != 2 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data10 != 2 then + return -1 +endi +if $data11 != 1 then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts = b.ts; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != NULL then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != NULL then + return -1 +endi + + +sql select a.ts, b.ts from tba1 a left asof join tba2 b on b.ts = a.ts; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != NULL then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != NULL then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts >= b.ts ; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:03.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left asof join tba2 b on b.ts <= a.ts ; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:03.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts ; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:03.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left asof join tba2 b on b.ts < a.ts ; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:03.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts <= b.ts ; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left asof join tba2 b on b.ts >= a.ts ; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:05.000@ then + return -1 +endi + + +sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts < b.ts ; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:05.000@ then + return -1 +endi + + +sql select a.ts, b.ts from tba1 a left asof join tba2 b on b.ts > a.ts ; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba2 a left asof join tba1 b on a.ts >= b.ts; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:04.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba2 a left asof join tba1 b on a.ts > b.ts; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:04.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba2 a left asof join tba1 b on a.ts <= b.ts; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data31 != NULL then + return -1 +endi + +sql select a.ts, b.ts from tba2 a left asof join tba1 b on a.ts < b.ts; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data31 != NULL then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts jlimit 2 +if $rows != 7 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != NULL then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data41 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data61 != @23-11-17 16:29:03.000@ then + return -1 +endi + + + +