From 888343f6fa9389cc46b2163a99baab60b9e20e20 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 22 Jan 2024 19:24:31 +0800 Subject: [PATCH] enh: add asof ut --- source/libs/executor/inc/mergejoin.h | 2 +- source/libs/executor/src/mergejoin.c | 2 +- source/libs/executor/test/joinTests.cpp | 139 ++++++++++++++++++++++-- 3 files changed, 131 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index a8e28d56b9..9b4a4c1fde 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -20,7 +20,7 @@ extern "C" { #endif #if 1 -#define MJOIN_DEFAULT_BLK_ROWS_NUM 10 //4096 +#define MJOIN_DEFAULT_BLK_ROWS_NUM 2 //4096 #define MJOIN_HJOIN_CART_THRESHOLD 10 #define MJOIN_BLK_SIZE_LIMIT 0 //10485760 #define MJOIN_ROW_BITMAP_SIZE (2 * 1048576) diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 5c98e0cab9..9cc1b9a9ab 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -1837,7 +1837,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow } int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { - if (pCtx->cache.outBlk->info.rows <= 0) { + if (NULL == pCtx->cache.outBlk || pCtx->cache.outBlk->info.rows <= 0) { return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true); } diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index de5d501d04..e716f070b8 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -113,6 +113,7 @@ typedef struct { int32_t colCond; int32_t joinType; int32_t subType; + int32_t asofOpType; int64_t jLimit; int32_t leftTotalRows; @@ -167,6 +168,12 @@ typedef struct { int32_t rightFinMatchNum; bool* rightFinMatch; + + int64_t leftRowsNum; + SArray* leftRowsList; + int64_t rightRowsNum; + SArray* rightRowsList; + SArray* rightFilterOut; } SJoinTestCtx; typedef struct { @@ -795,6 +802,7 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param jtCtx.subType = param->subType; jtCtx.asc = param->asc; jtCtx.jLimit = param->jLimit; + jtCtx.asofOpType = param->asofOp; jtCtx.leftColOnly = (JOIN_TYPE_LEFT == param->joinType && JOIN_STYPE_SEMI == param->subType); jtCtx.rightColOnly = (JOIN_TYPE_RIGHT == param->joinType && JOIN_STYPE_SEMI == param->subType); @@ -835,6 +843,62 @@ SSDataBlock* createDummyBlock(int32_t blkId) { return p; } +void appendAllAsofResRows() { + int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList); + int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList); + if (rightRows <= 0) { + for (int32_t i = 0; i < leftRows; ++i) { + char* leftResRows = taosArrayGet(jtCtx.leftRowsList, i); + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + if (jtCtx.resColList[MAX_SLOT_NUM + c]) { + *(char*)(leftResRows + MAX_SLOT_NUM + c) = true; + } + } + + pushResRow(leftResRows, jtCtx.resColSize); + } + } else { + ASSERT(rightRows <= jtCtx.jLimit); + for (int32_t i = 0; i < leftRows; ++i) { + char* leftResRows = taosArrayGet(jtCtx.leftRowsList, i); + for (int32_t r = 0; r < rightRows; ++r) { + char* rightResRows = taosArrayGet(jtCtx.rightRowsList, r); + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + if (jtCtx.resColList[MAX_SLOT_NUM + c]) { + if (*(bool*)(rightResRows + MAX_SLOT_NUM + c)) { + *(bool*)(leftResRows + MAX_SLOT_NUM + c) = true; + } else { + *(bool*)(leftResRows + MAX_SLOT_NUM + c) = false; + memcpy(leftResRows + jtCtx.resColOffset[MAX_SLOT_NUM + c], rightResRows + jtCtx.resColOffset[MAX_SLOT_NUM + c], tDataTypes[jtInputColType[c]].bytes); + } + } + } + + pushResRow(leftResRows, jtCtx.resColSize); + } + } + } +} + +void checkAppendAsofResRows(bool forceOut) { + int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList); + if (rightRows <= 0 && !forceOut) { + return; + } + + +} + +void trimForAsofJlimit() { + int32_t rowNum = taosArrayGetSize(jtCtx.rightRowsList); + if (rowNum <= jtCtx.jLimit) { + return; + } + + taosArrayPopFrontBatch(jtCtx.rightRowsList, rowNum - jtCtx.jLimit); + taosArrayPopFrontBatch(jtCtx.rightFilterOut, rowNum - jtCtx.jLimit); +} + void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { if (grpRows <= 0) { return; @@ -848,9 +912,11 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { jtCtx.inputStat |= (1 << blkId); + SArray* pTableRows = NULL; int32_t tableOffset = 0; int32_t peerOffset = 0; bool keepRes = false; + bool keepInput = false; if (blkId == LEFT_BLK_ID) { if ((jtCtx.joinType == JOIN_TYPE_LEFT || jtCtx.joinType == JOIN_TYPE_FULL) && jtCtx.subType != JOIN_STYPE_SEMI) { keepRes = true; @@ -862,6 +928,24 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { } tableOffset = MAX_SLOT_NUM; } + + if (JOIN_STYPE_ASOF == jtCtx.subType && jtCtx.asofOpType != OP_TYPE_EQUAL) { + keepInput = true; + if (blkId == LEFT_BLK_ID) { + if (NULL == jtCtx.leftRowsList) { + jtCtx.leftRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.resColSize); + jtCtx.leftRowsNum = 0; + } + pTableRows = jtCtx.leftRowsList; + } else { + if (NULL == jtCtx.rightRowsList) { + jtCtx.rightRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.resColSize); + jtCtx.rightFilterOut = taosArrayInit(jtCtx.jLimit, sizeof(bool)); + jtCtx.rightRowsNum = 0; + } + pTableRows = jtCtx.rightRowsList; + } + } int32_t filterNum = (blkId == LEFT_BLK_ID) ? jtCtx.leftFilterNum : jtCtx.rightFilterNum; int32_t peerFilterNum = (blkId == LEFT_BLK_ID) ? jtCtx.rightFilterNum : jtCtx.leftFilterNum; @@ -872,6 +956,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { int64_t tmpBigint = 0; bool isNull = false; bool filterOut = false; + bool addToRowList = false; int32_t vRange = TMAX(grpRows / 3, 3); for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { jtCtx.grpOffset[c] = c * TMAX(100, grpRows); @@ -888,6 +973,8 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { if (!filterOut) { memset(jtCtx.resColBuf, 0, jtCtx.resColSize); } + + addToRowList = false; for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { switch (jtInputColType[c]) { @@ -929,7 +1016,16 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet((*ppBlk)->pDataBlock, c); colDataSetVal(pCol, (*ppBlk)->info.rows, pData, isNull); - if (keepRes && !filterOut && jtCtx.resColList[tableOffset + c]) { + if (keepInput && jtCtx.resColList[tableOffset + c]) { + if (!filterOut || (filterOut && blkId != LEFT_BLK_ID)) { + if (isNull) { + *(char*)(jtCtx.resColBuf + tableOffset + c) = true; + } else { + memcpy(jtCtx.resColBuf + jtCtx.resColOffset[tableOffset + c], pData, tDataTypes[jtInputColType[c]].bytes); + } + addToRowList = true; + } + } else if (keepRes && !filterOut && jtCtx.resColList[tableOffset + c]) { if (isNull) { *(char*)(jtCtx.resColBuf + tableOffset + c) = true; } else { @@ -938,6 +1034,12 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { } } + if (keepInput && addToRowList) { + taosArrayPush(pTableRows, jtCtx.resColBuf); + bool fout = filterOut ? true : false; + taosArrayPush(jtCtx.rightFilterOut, &fout); + } + if (keepRes && !filterOut) { for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { if (jtCtx.resColList[peerOffset + c]) { @@ -950,6 +1052,19 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { (*ppBlk)->info.rows++; } + + if (keepInput) { + if (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN) { + if (blkId == LEFT_BLK_ID) { + appendAllAsofResRows(); + } else { + trimForAsofJlimit(); + } + } else { + checkAppendAsofResRows(); + } + } + } void createRowData(SSDataBlock* pBlk, int64_t tbOffset, int32_t rowIdx, int32_t vRange) { @@ -2165,7 +2280,6 @@ void jtInitLogFile() { void initJoinTest() { jtCtx.leftBlkList = taosArrayInit(10, POINTER_BYTES); jtCtx.rightBlkList = taosArrayInit(10, POINTER_BYTES); - jtCtx.jtResRows = tSimpleHashInit(10000000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); joinTestReplaceRetrieveFp(); @@ -2203,13 +2317,18 @@ void handleTestDone() { jtCtx.resRows = 0; jtCtx.inputStat = 0; + + taosArrayDestroy(jtCtx.leftRowsList); + taosArrayDestroy(jtCtx.rightRowsList); + jtCtx.leftRowsList = NULL; + jtCtx.rightRowsList = NULL; } void runSingleTest(char* caseName, SJoinTestParam* param) { bool contLoop = true; SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param); - createDummyBlkList(200, 200, 200, 200, 20); + createDummyBlkList(10, 10, 10, 10, 3); while (contLoop) { rerunBlockedHere(); @@ -2241,7 +2360,7 @@ void handleCaseEnd() { } // namespace -#if 1 +#if 0 #if 1 TEST(innerJoin, noCondTest) { SJoinTestParam param; @@ -2344,7 +2463,7 @@ TEST(innerJoin, fullCondTest) { #endif -#if 1 +#if 0 #if 1 TEST(leftOuterJoin, noCondTest) { SJoinTestParam param; @@ -2446,7 +2565,7 @@ TEST(leftOuterJoin, fullCondTest) { #endif #endif -#if 1 +#if 0 #if 1 TEST(fullOuterJoin, noCondTest) { SJoinTestParam param; @@ -2549,7 +2668,7 @@ TEST(fullOuterJoin, fullCondTest) { #endif #endif -#if 1 +#if 0 #if 1 TEST(leftSemiJoin, noCondTest) { SJoinTestParam param; @@ -2652,7 +2771,7 @@ TEST(leftSemiJoin, fullCondTest) { #endif #endif -#if 1 +#if 0 #if 1 TEST(leftAntiJoin, noCondTest) { SJoinTestParam param; @@ -2759,7 +2878,7 @@ TEST(leftAntiJoin, fullCondTest) { #if 1 TEST(leftAsofJoin, noCondGreaterThanTest) { SJoinTestParam param; - char* caseName = "leftAntiJoin:noCondGreaterThanTest"; + char* caseName = "leftAsofJoin:noCondGreaterThanTest"; SExecTaskInfo* pTask = createDummyTaskInfo(caseName); param.pTask = pTask; @@ -2787,7 +2906,7 @@ TEST(leftAsofJoin, noCondGreaterThanTest) { #if 1 TEST(leftAsofJoin, eqCondTest) { SJoinTestParam param; - char* caseName = "leftAntiJoin:eqCondTest"; + char* caseName = "leftAsofJoin:eqCondTest"; SExecTaskInfo* pTask = createDummyTaskInfo(caseName); param.pTask = pTask;