From 32b16bd6e7a96cce52d1fc5acda27ba84053ce0d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 29 Jan 2024 16:58:08 +0800 Subject: [PATCH] enh: fix asof bugs --- source/libs/executor/inc/mergejoin.h | 23 +-- source/libs/executor/src/mergejoin.c | 128 ++++++++++---- source/libs/executor/test/joinTests.cpp | 224 ++++++++++++++++++------ 3 files changed, 269 insertions(+), 106 deletions(-) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 9b4a4c1fde..4270cd5d61 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -197,14 +197,6 @@ typedef enum { E_CACHE_INBLK } SMJoinCacheMode; -typedef struct SAsofJoinGrpRows { - SSDataBlock* blk; - bool clonedBlk; - int32_t blkRowIdx; - int32_t readIdx; -} SAsofJoinGrpRows; - - typedef struct SMJoinWinCache { int32_t pageLimit; int32_t outRowIdx; @@ -228,9 +220,9 @@ typedef struct SMJoinWindowCtx { // KEEP IT FIRST int32_t asofOpType; - bool asofLowerRow; - bool asofEqRow; - bool asofGreaterRow; + bool lowerRowsAcq; + bool eqRowsAcq; + bool greaterRowsAcq; bool eqPostDone; int64_t lastTs; @@ -335,11 +327,12 @@ typedef struct SMJoinOperatorInfo { #define MJOIN_SAVE_TB_BLK(_cache, _tb) \ do { \ - ASSERT(taosArrayGetSize((_cache)->grps) >= 1); \ SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \ - ASSERT(pGrp->blk == (_tb)->blk); \ - pGrp->beginIdx = (_tb)->blkRowIdx; \ - pGrp->readIdx = pGrp->beginIdx; \ + if (NULL != pGrp) { \ + ASSERT(pGrp->blk == (_tb)->blk); \ + pGrp->beginIdx = (_tb)->blkRowIdx; \ + pGrp->readIdx = pGrp->beginIdx; \ + } \ } while (0) #define MJOIN_POP_TB_BLK(_cache) \ diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 9cc1b9a9ab..c532d101f9 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -1784,9 +1784,11 @@ int32_t mAsofLowerAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, boo int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) { int64_t eqRowsNum = 0; - SMJoinGrpRows grp = {.blk = pTable->blk}; + SMJoinGrpRows grp; do { + grp.blk = pTable->blk; + SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { @@ -1813,7 +1815,8 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow } if (eqRowsNum < pCtx->jLimit) { - MJ_ERR_RET(mAsofLowerAddRowsToCache(pCtx, &grp, false)); + grp.endIdx = grp.beginIdx + TMIN(grp.endIdx - grp.beginIdx + 1, pCtx->jLimit - eqRowsNum) - 1; + MJ_ERR_RET(mAsofLowerAddRowsToCache(pCtx, &grp, true)); } eqRowsNum += grp.endIdx - grp.beginIdx + 1; @@ -1823,6 +1826,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); pTable->blkRowIdx = 0; + pCtx->buildGrp.blk = pTable->blk; if (NULL == pTable->blk) { pTable->dsFetchDone = true; @@ -1893,6 +1897,8 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp)); rowsLeft -= grpRemainRows; pCtx->cache.outRowIdx = 0; + probeGrp->readIdx++; + probeGrp->endIdx = probeEndIdx; continue; } @@ -1908,40 +1914,45 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { return TSDB_CODE_SUCCESS; } -int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, bool lastBuildGrp) { - if (!pCtx->asofEqRow) { +int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, bool lastBuildGrp, bool skipEqPost) { + if (!pCtx->eqRowsAcq) { MJ_ERR_RET(mAsofLowerDumpGrpCache(pCtx)); + + pCtx->lastEqGrp = true; if (pCtx->grpRemains) { return TSDB_CODE_SUCCESS; } - - if (!pCtx->eqPostDone && !lastBuildGrp) { - pCtx->eqPostDone = true; - return mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs); - } - - return TSDB_CODE_SUCCESS; } - if (!pCtx->eqPostDone && !lastBuildGrp) { + if (!pCtx->eqPostDone && !lastBuildGrp && (pCtx->eqRowsAcq || !skipEqPost)) { pCtx->eqPostDone = true; MJ_ERR_RET(mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs)); } - return mAsofLowerDumpGrpCache(pCtx); + if (!pCtx->eqRowsAcq) { + return TSDB_CODE_SUCCESS; + } + + MJ_ERR_RET(mAsofLowerDumpGrpCache(pCtx)); + + pCtx->lastEqGrp = true; + + return TSDB_CODE_SUCCESS; } int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { SMJoinOperatorInfo* pJoin = pCtx->pJoin; - pCtx->lastEqGrp = true; if (!lastBuildGrp) { pCtx->eqPostDone = false; } - - MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp)); - return mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp); + bool wholeBlk = false; + MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &wholeBlk, &pCtx->probeGrp)); + + MJ_ERR_RET(mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp, wholeBlk)); + + return TSDB_CODE_SUCCESS; } @@ -1990,7 +2001,7 @@ int32_t mAsofLowerProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p } int32_t mAsofLowerHandleGrpRemains(SMJoinWindowCtx* pCtx) { - return (pCtx->lastEqGrp) ? mAsofLowerDumpUpdateEqRows(pCtx, pCtx->pJoin, false) : mAsofLowerDumpGrpCache(pCtx); + return (pCtx->lastEqGrp) ? mAsofLowerDumpUpdateEqRows(pCtx, pCtx->pJoin, false, true) : mAsofLowerDumpGrpCache(pCtx); } static bool mAsofLowerRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { @@ -2062,6 +2073,12 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { } } + if (pCtx->lastEqGrp && !pCtx->eqPostDone) { + pCtx->eqPostDone = true; + MJ_ERR_JRET(mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs)); + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + } + while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { if (probeTs == buildTs) { pCtx->lastTs = probeTs; @@ -2092,6 +2109,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1; MJ_ERR_JRET(mAsofLowerDumpGrpCache(pCtx)); + pCtx->lastEqGrp = false; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; @@ -2208,7 +2226,12 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) { } //ASSERT((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum); + } else { + ASSERT(grpNum == 1); } + + ASSERT(taosArrayGetSize(pCache->grps) == 1); + ASSERT(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum); do { build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, build->downStreamIdx); @@ -2227,6 +2250,7 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) { } MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, build->blk, 0, build->blk->info.rows)); + pCache->rowNum += build->blk->info.rows; //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx; } while (pCache->rowNum < pCtx->jLimit); @@ -2259,17 +2283,16 @@ void mAsofGreaterUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) { int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) { if (!lastBuildGrp) { + MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build); MJ_ERR_RET(mAsofGreaterChkFillGrpCache(pCtx)); } - MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build); - mAsofGreaterUpdateBuildGrpEndIdx(pCtx); return mAsofGreaterDumpGrpCache(pCtx); } -int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) { +int32_t mAsofGreaterSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) { SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { @@ -2278,17 +2301,22 @@ int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool* } pTable->blkRowIdx++; + pCtx->cache.rowNum--; + char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1); if (timestamp != *(int64_t*)pEndVal) { for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx); if (timestamp == *(int64_t*)pNextVal) { + pCtx->cache.rowNum--; continue; } *wholeBlk = false; return TSDB_CODE_SUCCESS; } + } else { + pCtx->cache.rowNum -= (pTable->blk->info.rows - pTable->blkRowIdx); } *wholeBlk = true; @@ -2303,7 +2331,7 @@ int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) { do { do { - MJ_ERR_RET(mAsofGreaterSkipEqRows(pTable, timestamp, &wholeBlk)); + MJ_ERR_RET(mAsofGreaterSkipEqRows(pCtx, pTable, timestamp, &wholeBlk)); if (!wholeBlk) { return TSDB_CODE_SUCCESS; } @@ -2311,6 +2339,9 @@ int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) { MJOIN_POP_TB_BLK(cache); MJOIN_RESTORE_TB_BLK(cache, pTable); } while (!MJOIN_BUILD_TB_ROWS_DONE(pTable)); + + ASSERT(pCtx->cache.rowNum == 0); + ASSERT(taosArrayGetSize(pCtx->cache.grps) == 0); pTable->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pTable->downStreamIdx); qDebug("%s merge join %s table got block to skip eq ts, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); @@ -2330,7 +2361,7 @@ int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) { int32_t mAsofGreaterUpdateDumpEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { - if (!pCtx->asofEqRow && !lastBuildGrp) { + if (!pCtx->eqRowsAcq && !lastBuildGrp) { MJ_ERR_RET(mAsofGreaterSkipAllEqRows(pCtx, timestamp)); } @@ -2368,16 +2399,16 @@ int32_t mAsofGreaterProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p return mAsofGreaterFillDumpGrpCache(pCtx, false); } -int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { +int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData** pCol, int64_t* probeTs, int64_t* buildTs) { do { - MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build); + MJOIN_GET_TB_CUR_TS(*pCol, *buildTs, pJoin->build); if (!PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { break; } pCtx->cache.rowNum--; while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { - MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build); + MJOIN_GET_TB_CUR_TS(*pCol, *buildTs, pJoin->build); if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { pCtx->cache.rowNum--; continue; @@ -2388,6 +2419,7 @@ int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* MJOIN_POP_TB_BLK(&pCtx->cache); MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build); + MJOIN_GET_TB_COL_TS(*pCol, *buildTs, pJoin->build); } while (!MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)); return TSDB_CODE_SUCCESS; @@ -2398,7 +2430,10 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p bool buildGot = false; do { - if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { + if ((probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) && pCtx->cache.rowNum < pCtx->jLimit) { + pJoin->build->newBlk = false; + MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build); + ASSERT(taosArrayGetSize(pCtx->cache.grps) <= 1); buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); } @@ -2412,6 +2447,7 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; + MJOIN_POP_TB_BLK(&pCtx->cache); continue; } } @@ -2419,7 +2455,7 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p break; } while (true); - if (buildGot) { + if (buildGot && pJoin->build->newBlk) { if (NULL == pCtx->cache.outBlk) { pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit); @@ -2491,7 +2527,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { MJ_ERR_JRET(mAsofGreaterProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs)); } else { - MJ_ERR_JRET(mAsofGreaterProcessGreaterGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs)); + MJ_ERR_JRET(mAsofGreaterProcessGreaterGrp(pCtx, pJoin, &pBuildCol, &probeTs, &buildTs)); } if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { @@ -2551,16 +2587,34 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; pCtx->pJoin = pJoin; - pCtx->asofOpType = pJoinNode->asofOpType; - pCtx->asofEqRow = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType); - pCtx->asofLowerRow = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType); - pCtx->asofGreaterRow = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType); pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; - if (pCtx->asofLowerRow) { - pJoin->joinFp = mAsofLowerJoinDo; - } else if (pCtx->asofGreaterRow) { - pJoin->joinFp = mAsofGreaterJoinDo; + switch (pJoinNode->subType) { + case JOIN_STYPE_ASOF: + pCtx->asofOpType = pJoinNode->asofOpType; + pCtx->eqRowsAcq = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType); + pCtx->lowerRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType); + pCtx->greaterRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType); + + if (pCtx->lowerRowsAcq) { + pJoin->joinFp = mAsofLowerJoinDo; + } else if (pCtx->greaterRowsAcq) { + pJoin->joinFp = mAsofGreaterJoinDo; + } + break; + case JOIN_STYPE_WIN: + pCtx->eqRowsAcq = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType); + pCtx->lowerRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType); + pCtx->greaterRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType); + + if (pCtx->lowerRowsAcq) { + pJoin->joinFp = mAsofLowerJoinDo; + } else if (pCtx->greaterRowsAcq) { + pJoin->joinFp = mAsofGreaterJoinDo; + } + break; + default: + break; } if (pJoinNode->node.inputTsOrder != ORDER_DESC) { diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 1cd904bc48..a2b3134441 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -66,7 +66,7 @@ enum { }; #define COL_DISPLAY_WIDTH 18 -#define JT_MAX_LOOP 30000 +#define JT_MAX_LOOP 5000 #define LEFT_BLK_ID 0 #define RIGHT_BLK_ID 1 @@ -858,18 +858,20 @@ void appendAsofLeftEachResGrps(char* leftInRow, int32_t rightOffset, int32_t rig memcpy(jtCtx.resColBuf + jtCtx.resColOffset[c], leftInRow + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes); } } - - for (int32_t r = rightOffset; r < rightRows; ++r) { - bool* rightFilterOut = taosArrayGet(jtCtx.rightFilterOut, r); + + int32_t endIdx = TMIN(rightRows, taosArrayGetSize(jtCtx.rightRowsList) - rightOffset) + rightOffset; + for (int32_t r = rightOffset; r < endIdx; ++r) { + bool* rightFilterOut = (bool*)taosArrayGet(jtCtx.rightFilterOut, r); if (*rightFilterOut) { continue; } - char* rightResRows = taosArrayGet(jtCtx.rightRowsList, r); + char* rightResRows = (char*)taosArrayGet(jtCtx.rightRowsList, r); for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { if (jtCtx.resColList[MAX_SLOT_NUM + c]) { - if (*(bool*)(rightResRows + MAX_SLOT_NUM + c)) { + if (*(bool*)(rightResRows + c)) { *(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = true; + memset(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], 0, tDataTypes[jtInputColType[c]].bytes); } else { *(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = false; memcpy(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], rightResRows + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes); @@ -908,14 +910,16 @@ void appendAllAsofResRows() { int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList); int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList); if (rightRows <= 0) { - for (int32_t i = 0; i < leftRows; ++i) { - char* leftInRow = taosArrayGet(jtCtx.leftRowsList, i); - appendAsofLeftNonMatchGrp(leftInRow); - } + if (0 == jtCtx.rightFilterNum) { + for (int32_t i = 0; i < leftRows; ++i) { + char* leftInRow = (char*)taosArrayGet(jtCtx.leftRowsList, i); + appendAsofLeftNonMatchGrp(leftInRow); + } + } } else { ASSERT(rightRows <= jtCtx.jLimit); for (int32_t i = 0; i < leftRows; ++i) { - char* leftInRow = taosArrayGet(jtCtx.leftRowsList, i); + char* leftInRow = (char*)taosArrayGet(jtCtx.leftRowsList, i); appendAsofLeftEachResGrps(leftInRow, 0, rightRows); } } @@ -933,11 +937,11 @@ void chkAppendAsofGreaterResRows(bool forceOut) { int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList); int32_t i = 0; for (; i < leftRows; ++i) { - char* leftRow = taosArrayGet(jtCtx.leftRowsList, i); + char* leftRow = (char*)taosArrayGet(jtCtx.leftRowsList, i); int64_t* leftTs = (int64_t*)(leftRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]); bool append = false; for (int32_t r = rightOffset; r < rightRows; ++r) { - char* rightRow = taosArrayGet(jtCtx.rightRowsList, r); + char* rightRow = (char*)taosArrayGet(jtCtx.rightRowsList, r); int64_t* rightTs = (int64_t*)(rightRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]); if ((*leftTs > *rightTs) || (*leftTs == *rightTs && OP_TYPE_LOWER_THAN == jtCtx.asofOpType)) { rightOffset++; @@ -961,8 +965,10 @@ void chkAppendAsofGreaterResRows(bool forceOut) { if (!forceOut) { break; } - - appendAsofLeftNonMatchGrp(leftRow); + + if (0 == jtCtx.rightFilterNum) { + appendAsofLeftNonMatchGrp(leftRow); + } } } @@ -1000,31 +1006,20 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { bool keepRes = false; bool keepInput = false; if (blkId == LEFT_BLK_ID) { - if ((jtCtx.joinType == JOIN_TYPE_LEFT || jtCtx.joinType == JOIN_TYPE_FULL) && jtCtx.subType != JOIN_STYPE_SEMI) { + if ((jtCtx.joinType == JOIN_TYPE_LEFT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF)) { keepRes = true; } peerOffset = MAX_SLOT_NUM; } else { - if ((jtCtx.joinType == JOIN_TYPE_RIGHT || jtCtx.joinType == JOIN_TYPE_FULL) && jtCtx.subType != JOIN_STYPE_SEMI) { + if ((jtCtx.joinType == JOIN_TYPE_RIGHT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF)) { keepRes = true; } tableOffset = MAX_SLOT_NUM; } - if (JOIN_STYPE_ASOF == jtCtx.subType && jtCtx.asofOpType != OP_TYPE_EQUAL) { - keepInput = true; - if (blkId == LEFT_BLK_ID) { - if (NULL == jtCtx.leftRowsList) { - jtCtx.leftRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.inColSize); - } - pTableRows = jtCtx.leftRowsList; - } else { - if (NULL == jtCtx.rightRowsList) { - jtCtx.rightRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.inColSize); - jtCtx.rightFilterOut = taosArrayInit(jtCtx.jLimit, sizeof(bool)); - } - pTableRows = jtCtx.rightRowsList; - } + if (JOIN_STYPE_ASOF == jtCtx.subType) { + keepInput = jtCtx.asofOpType != OP_TYPE_EQUAL ? true : (blkId == LEFT_BLK_ID); + pTableRows = (blkId == LEFT_BLK_ID) ? jtCtx.leftRowsList : jtCtx.rightRowsList; } int32_t filterNum = (blkId == LEFT_BLK_ID) ? jtCtx.leftFilterNum : jtCtx.rightFilterNum; @@ -1049,12 +1044,15 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { taosArrayPush((blkId == LEFT_BLK_ID) ? jtCtx.leftBlkList : jtCtx.rightBlkList, ppBlk); } - filterOut = (peerFilterNum > 0) ? true : false; + filterOut = (peerFilterNum > 0 && jtCtx.subType != JOIN_STYPE_ASOF) ? true : false; if (!filterOut) { memset(jtCtx.resColBuf, 0, jtCtx.resColSize); + if (keepInput) { + memset(jtCtx.inColBuf, 0, jtCtx.inColSize); + } } - addToRowList = false; + addToRowList = true; for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { switch (jtInputColType[c]) { @@ -1071,7 +1069,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { tmpInt = (taosRand() % 2) ? INT_FILTER_VALUE + jtCtx.grpOffset[c] + taosRand() % vRange : INT_FILTER_VALUE - jtCtx.grpOffset[c] - taosRand() % vRange; pData = (char*)&tmpInt; isNull = false; - if (filterNum && filterCol[c] && tmpInt <= INT_FILTER_VALUE) { + if (!filterOut && filterNum && filterCol[c] && tmpInt <= INT_FILTER_VALUE) { filterOut = true; } } else { @@ -1085,7 +1083,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { tmpBigint = (taosRand() % 2) ? BIGINT_FILTER_VALUE + jtCtx.curKeyOffset++ : BIGINT_FILTER_VALUE - jtCtx.curKeyOffset++; pData = (char*)&tmpBigint; isNull = false; - if (filterNum && filterCol[c] && tmpBigint <= BIGINT_FILTER_VALUE) { + if (!filterOut && filterNum && filterCol[c] && tmpBigint <= BIGINT_FILTER_VALUE) { filterOut = true; } break; @@ -1103,7 +1101,8 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { } else { memcpy(jtCtx.inColBuf + jtCtx.inColOffset[c], pData, tDataTypes[jtInputColType[c]].bytes); } - addToRowList = true; + } else { + addToRowList = false; } } else if (keepRes && !filterOut && jtCtx.resColList[tableOffset + c]) { if (isNull) { @@ -1116,8 +1115,10 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { if (keepInput && addToRowList) { taosArrayPush(pTableRows, jtCtx.inColBuf); - bool fout = filterOut ? true : false; - taosArrayPush(jtCtx.rightFilterOut, &fout); + if (blkId == RIGHT_BLK_ID) { + bool fout = filterOut ? true : false; + taosArrayPush(jtCtx.rightFilterOut, &fout); + } } if (keepRes && !filterOut) { @@ -1134,14 +1135,14 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { } if (keepInput) { - if (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN) { + if (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN || jtCtx.asofOpType == OP_TYPE_EQUAL) { if (blkId == LEFT_BLK_ID) { appendAllAsofResRows(); } else { trimForAsofJlimit(); } } else { - chkAppendAsofGreaterResRows(); + chkAppendAsofGreaterResRows(false); } } @@ -1798,7 +1799,7 @@ void addAsofEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) { } } - if (!leftTable) { + if (!leftTable && (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN || jtCtx.asofOpType == OP_TYPE_EQUAL)) { trimForAsofJlimit(); } } @@ -1833,6 +1834,9 @@ void asofJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { addAsofEqInRows(leftGrpRows, 0, true); addAsofEqInRows(rightGrpRows, rightTbOffset, false); chkAppendAsofGreaterResRows(true); + taosArrayClear(jtCtx.leftRowsList); + taosArrayClear(jtCtx.rightRowsList); + taosArrayClear(jtCtx.rightFilterOut); break; default: return; @@ -2095,6 +2099,10 @@ void createDummyBlkList(int32_t leftMaxRows, int32_t leftMaxGrpRows, int32_t rig jtCtx.rightFinMatch = (bool*)taosMemoryRealloc(jtCtx.rightFinMatch, maxGrpRows * sizeof(bool)); } + taosArrayClear(jtCtx.leftRowsList); + taosArrayClear(jtCtx.rightRowsList); + taosArrayClear(jtCtx.rightFilterOut); + createBothBlkRowsData(); } @@ -2245,6 +2253,23 @@ char* getInputStatStr(char* inputStat) { return inputStat; } +char* getAsofOpStr() { + switch (jtCtx.asofOpType) { + case OP_TYPE_GREATER_THAN: + return ">"; + case OP_TYPE_GREATER_EQUAL: + return ">="; + case OP_TYPE_LOWER_THAN: + return "<"; + case OP_TYPE_LOWER_EQUAL: + return "<="; + case OP_TYPE_EQUAL: + return "="; + default: + return "UNKNOWN"; + } +} + void printBasicInfo(char* caseName) { if (!jtCtrl.printTestInfo) { return; @@ -2257,6 +2282,10 @@ void printBasicInfo(char* caseName) { jtCtx.leftMaxGrpRows, jtCtx.rightMaxGrpRows, jtCtx.blkRows, jtColCondStr[jtCtx.colCond], jtJoinTypeStr[jtCtx.joinType], jtSubTypeStr[jtCtx.subType], getInputStatStr(inputStat)); + if (JOIN_STYPE_ASOF == jtCtx.subType) { + printf("\t asofOp:%s\n\t JLimit:%" PRId64 "\n", getAsofOpStr(), jtCtx.jLimit); + } + printf("Input Info:\n\t totalBlk:left-%d right-%d\n\t totalRows:left-%d right-%d\n\t " "blkRowSize:%d\n\t inputCols:left-%s %s %s %s right-%s %s %s %s\n", (int32_t)taosArrayGetSize(jtCtx.leftBlkList), (int32_t)taosArrayGetSize(jtCtx.rightBlkList), @@ -2475,7 +2504,11 @@ void initJoinTest() { offset += tDataTypes[jtInputColType[i]].bytes; } jtCtx.inColSize = offset; - jtCtx.inColBuf = taosMemoryMalloc(jtCtx.inColSize); + jtCtx.inColBuf = (char*)taosMemoryMalloc(jtCtx.inColSize); + + jtCtx.leftRowsList = taosArrayInit(1024, jtCtx.inColSize); + jtCtx.rightRowsList = taosArrayInit(1024, jtCtx.inColSize); + jtCtx.rightFilterOut = taosArrayInit(1024, sizeof(bool)); jtInitLogFile(); } @@ -2506,18 +2539,13 @@ void handleTestDone() { jtCtx.resRows = 0; jtCtx.inputStat = 0; - - taosArrayDestroy(jtCtx.leftRowsList); - taosArrayDestroy(jtCtx.rightRowsList); - jtCtx.leftRowsList = NULL; - jtCtx.rightRowsList = NULL; } void runSingleTest(char* caseName, SJoinTestParam* param) { bool contLoop = true; SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param); - createDummyBlkList(10, 10, 10, 10, 3); + createDummyBlkList(200, 200, 200, 200, 10); while (contLoop) { rerunBlockedHere(); @@ -3093,30 +3121,118 @@ TEST(leftAsofJoin, noCondGreaterThanTest) { #endif #if 1 -TEST(leftAsofJoin, eqCondTest) { +TEST(leftAsofJoin, noCondGreaterEqTest) { SJoinTestParam param; - char* caseName = "leftAsofJoin:eqCondTest"; + char* caseName = "leftAsofJoin:noCondGreaterEqTest"; SExecTaskInfo* pTask = createDummyTaskInfo(caseName); param.pTask = pTask; param.joinType = JOIN_TYPE_LEFT; - param.subType = JOIN_STYPE_ANTI; - param.cond = TEST_EQ_COND; + param.subType = JOIN_STYPE_ASOF; + param.cond = TEST_NO_COND; + param.asofOp = OP_TYPE_GREATER_EQUAL; param.asc = true; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; + param.filter = false; runSingleTest(caseName, ¶m); param.filter = true; runSingleTest(caseName, ¶m); } - + printStatInfo(caseName); taosMemoryFree(pTask); - handleCaseEnd(); } #endif + +#if 1 +TEST(leftAsofJoin, noCondEqTest) { + SJoinTestParam param; + char* caseName = "leftAsofJoin:noCondEqTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_ASOF; + param.cond = TEST_NO_COND; + param.asofOp = OP_TYPE_EQUAL; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; + + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + +#if 1 +TEST(leftAsofJoin, noCondLowerThanTest) { + SJoinTestParam param; + char* caseName = "leftAsofJoin:noCondLowerThanTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_ASOF; + param.cond = TEST_NO_COND; + param.asofOp = OP_TYPE_LOWER_THAN; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; + + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + + +#if 1 +TEST(leftAsofJoin, noCondLowerEqTest) { + SJoinTestParam param; + char* caseName = "leftAsofJoin:noCondLowerEqTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_ASOF; + param.cond = TEST_NO_COND; + param.asofOp = OP_TYPE_LOWER_EQUAL; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; + + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + #endif