From c4dbae3a5a573d0c3c5a45ab0bb07d236f14e9d6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 20 Feb 2024 17:13:29 -0800 Subject: [PATCH] enh: add window join ut --- source/libs/command/src/explain.c | 23 +-- source/libs/executor/inc/mergejoin.h | 2 +- source/libs/executor/src/mergejoin.c | 23 ++- source/libs/executor/test/joinTests.cpp | 43 +++--- source/libs/planner/src/planOptimizer.c | 2 +- tests/script/tsim/join/join.sim | 2 + tests/script/tsim/join/right_win_join.sim | 177 ++++++++++++++++++++++ 7 files changed, 236 insertions(+), 36 deletions(-) create mode 100644 tests/script/tsim/join/right_win_join.sim diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index de7d7ee111..d8268660b3 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -628,16 +628,21 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT); - QRY_ERR_RET( - nodesNodeToSQL(pJoinNode->pPrimKeyCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - if (pJoinNode->pFullOnCond) { - EXPLAIN_ROW_APPEND(" AND "); - QRY_ERR_RET( - nodesNodeToSQL(pJoinNode->pFullOnCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + if (pJoinNode->pPrimKeyCond || pJoinNode->pFullOnCond) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT); + if (pJoinNode->pPrimKeyCond) { + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pPrimKeyCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + } + if (pJoinNode->pFullOnCond) { + if (pJoinNode->pPrimKeyCond) { + EXPLAIN_ROW_APPEND(" AND "); + } + QRY_ERR_RET( + nodesNodeToSQL(pJoinNode->pFullOnCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + } + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } break; } diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index c3c2a39320..13660fd268 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -19,7 +19,7 @@ extern "C" { #endif -#if 1 +#if 0 #define MJOIN_DEFAULT_BLK_ROWS_NUM 2 //4096 #define MJOIN_HJOIN_CART_THRESHOLD 10 #define MJOIN_BLK_SIZE_LIMIT 0 //10485760 diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index b6fc8c344d..c33f4e4a2d 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -2696,11 +2696,14 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i); SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCol->srcSlot); if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) < pCtx->winBeginTs) { - blockDataDestroy(pGrp->blk); + pCache->rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx); + if (pGrp->blk == pCache->outBlk) { + blockDataCleanup(pGrp->blk); + } + taosArrayPopFrontBatch(pCache->grps, 1); grpNum--; i--; - pCache->rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx); continue; } @@ -2712,7 +2715,12 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { if (*((int64_t*)pCol->pData + pGrp->beginIdx) <= pCtx->winEndTs) { pGrp->readIdx = pGrp->beginIdx; - pCache->rowNum -= (pGrp->beginIdx - startIdx); + if (pGrp->endIdx < pGrp->beginIdx) { + pGrp->endIdx = pGrp->beginIdx; + pCache->rowNum = 1; + } else { + pCache->rowNum -= (pGrp->beginIdx - startIdx); + } return TSDB_CODE_SUCCESS; } @@ -2772,7 +2780,7 @@ _return: int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) { SMJoinWinCache* pCache = &pCtx->cache; int32_t grpNum = taosArrayGetSize(pCache->grps); - if (grpNum <= 0) { + if (grpNum <= 0 || pCache->rowNum >= pCtx->jLimit) { return TSDB_CODE_SUCCESS; } @@ -2790,9 +2798,13 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) { } } else { int32_t startIdx = pGrp->endIdx; - for (; pGrp->endIdx < pGrp->blk->info.rows && pCache->rowNum < pCtx->jLimit; pGrp->endIdx++) { + for (; pCache->rowNum < pCtx->jLimit && ++pGrp->endIdx < pGrp->blk->info.rows; ) { if (*((int64_t*)pCol->pData + pGrp->endIdx) <= pCtx->winEndTs) { pCache->rowNum++; + if ((pGrp->endIdx + 1) >= pGrp->blk->info.rows) { + break; + } + continue; } @@ -2998,6 +3010,7 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ if (JOIN_STYPE_ASOF == pJoinNode->subType || JOIN_STYPE_WIN == pJoinNode->subType) { pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; pJoin->subType = JOIN_STYPE_OUTER; + pJoin->build->eqRowLimit = pCtx->jLimit; } else { pCtx->jLimit = -1; } diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index c7f1a39fd6..cde17a435a 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -66,7 +66,7 @@ enum { }; #define COL_DISPLAY_WIDTH 18 -#define JT_MAX_LOOP 50000 +#define JT_MAX_LOOP 10000 #define LEFT_BLK_ID 0 #define RIGHT_BLK_ID 1 @@ -199,7 +199,7 @@ typedef struct { SJoinTestCtx jtCtx = {0}; -SJoinTestCtrl jtCtrl = {1, 1, 1, 0}; +SJoinTestCtrl jtCtrl = {0, 0, 0, 0}; SJoinTestStat jtStat = {0}; SJoinTestResInfo jtRes = {0}; @@ -794,35 +794,34 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param p->joinType = param->joinType; p->subType = param->subType; p->asofOpType = param->asofOp; - if (param->jLimit > 1 || taosRand() % 2) { + if (p->subType == JOIN_STYPE_WIN || param->jLimit > 1 || taosRand() % 2) { SLimitNode* limitNode = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT); limitNode->limit = param->jLimit; p->pJLimit = (SNode*)limitNode; } + p->leftPrimSlotId = JT_PRIM_TS_SLOT_ID; p->rightPrimSlotId = JT_PRIM_TS_SLOT_ID; p->node.inputTsOrder = param->asc ? ORDER_ASC : ORDER_DESC; if (JOIN_STYPE_WIN == p->subType) { SWindowOffsetNode* pOffset = (SWindowOffsetNode*)nodesMakeNode(QUERY_NODE_WINDOW_OFFSET); - SValueNode* pStart = nodesMakeNode(QUERY_NODE_VALUE); - SValueNode* pEnd = nodesMakeNode(QUERY_NODE_VALUE); + SValueNode* pStart = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + SValueNode* pEnd = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); pStart->node.resType.type = TSDB_DATA_TYPE_BIGINT; pStart->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; - pStart->datum.i = (taosRand() % 2) ? (-1 * taosRand() % JT_MAX_WINDOW_OFFSET) : (taosRand() % JT_MAX_WINDOW_OFFSET); + pStart->datum.i = (taosRand() % 2) ? (((int32_t)-1) * (int64_t)(taosRand() % JT_MAX_WINDOW_OFFSET)) : (taosRand() % JT_MAX_WINDOW_OFFSET); pEnd->node.resType.type = TSDB_DATA_TYPE_BIGINT; pEnd->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; - pEnd->datum.i = (taosRand() % 2) ? (-1 * taosRand() % JT_MAX_WINDOW_OFFSET) : (taosRand() % JT_MAX_WINDOW_OFFSET); + pEnd->datum.i = (taosRand() % 2) ? (((int32_t)-1) * (int64_t)(taosRand() % JT_MAX_WINDOW_OFFSET)) : (taosRand() % JT_MAX_WINDOW_OFFSET); + if (pStart->datum.i > pEnd->datum.i) { + TSWAP(pStart->datum.i, pEnd->datum.i); + } pOffset->pStartOffset = (SNode*)pStart; pOffset->pEndOffset = (SNode*)pEnd; p->pWindowOffset = (SNode*)pOffset; - if (pStart->datum.i <= pEnd->datum.i) { - jtCtx.winStartOffset = pStart->datum.i; - jtCtx.winEndOffset = pEnd->datum.i; - } else { - jtCtx.winStartOffset = pEnd->datum.i; - jtCtx.winEndOffset = pStart->datum.i; - } + jtCtx.winStartOffset = pStart->datum.i; + jtCtx.winEndOffset = pEnd->datum.i; } jtCtx.joinType = param->joinType; @@ -1005,7 +1004,9 @@ void chkAppendAsofGreaterResRows(bool forceOut) { void appendWinEachResGrps(char* leftInRow, int32_t rightOffset, int32_t rightRows) { if (rightOffset < 0) { - appendLeftNonMatchGrp(leftInRow); + if (0 == jtCtx.rightFilterNum) { + appendLeftNonMatchGrp(leftInRow); + } return; } @@ -1101,7 +1102,9 @@ void chkAppendWinResRows(bool forceOut) { break; } - if (0 == jtCtx.rightFilterNum) { + if (winBeginIdx >= 0) { + appendWinEachResGrps(leftRow, winBeginIdx, rightRows - winBeginIdx); + } else if (0 == jtCtx.rightFilterNum) { appendLeftNonMatchGrp(leftRow); } } @@ -1142,12 +1145,12 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { bool keepRes = false; bool keepInput = false; if (blkId == LEFT_BLK_ID) { - if ((jtCtx.joinType == JOIN_TYPE_LEFT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF)) { + if ((jtCtx.joinType == JOIN_TYPE_LEFT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN)) { keepRes = true; } peerOffset = MAX_SLOT_NUM; } else { - if ((jtCtx.joinType == JOIN_TYPE_RIGHT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF)) { + if ((jtCtx.joinType == JOIN_TYPE_RIGHT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN)) { keepRes = true; } tableOffset = MAX_SLOT_NUM; @@ -1183,7 +1186,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { taosArrayPush((blkId == LEFT_BLK_ID) ? jtCtx.leftBlkList : jtCtx.rightBlkList, ppBlk); } - filterOut = (peerFilterNum > 0 && (jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN) ? true : false; + filterOut = (peerFilterNum > 0 && (jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN)) ? true : false; if (!filterOut) { memset(jtCtx.resColBuf, 0, jtCtx.resColSize); if (keepInput) { @@ -2752,7 +2755,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { bool contLoop = true; SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param); - createDummyBlkList(10, 10, 10, 10, 3); + createDummyBlkList(200, 200, 200, 200, 20); while (contLoop) { rerunBlockedHere(); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 8326446c31..79815a4acf 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -4055,7 +4055,7 @@ static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) { } SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; - if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2 + if (JOIN_STYPE_NONE != pJoin->subType || pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2 || pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN || pJoin->isLowLevelJoin) { if (pJoin->joinAlgo == JOIN_ALGO_UNKNOWN) { pJoin->joinAlgo = JOIN_ALGO_MERGE; diff --git a/tests/script/tsim/join/join.sim b/tests/script/tsim/join/join.sim index 5a883f4612..d1e4214ac9 100644 --- a/tests/script/tsim/join/join.sim +++ b/tests/script/tsim/join/join.sim @@ -68,6 +68,7 @@ run tsim/join/right_anti_join.sim run tsim/join/left_asof_join.sim run tsim/join/right_asof_join.sim run tsim/join/left_win_join.sim +run tsim/join/right_win_join.sim print ================== restart server to commit data into disk system sh/exec.sh -n dnode1 -s stop -x SIGINT @@ -85,5 +86,6 @@ run tsim/join/right_anti_join.sim run tsim/join/left_asof_join.sim run tsim/join/right_asof_join.sim run tsim/join/left_win_join.sim +run tsim/join/right_win_join.sim system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/join/right_win_join.sim b/tests/script/tsim/join/right_win_join.sim new file mode 100644 index 0000000000..07bed65655 --- /dev/null +++ b/tests/script/tsim/join/right_win_join.sim @@ -0,0 +1,177 @@ +sql connect +sql use test0; + +sql_error select a.col1, b.col1 from sta a right window join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' window_offset(-1s, 1s) order by a.col1, b.col1; +sql_error select a.col1, b.col1 from sta a right window join sta b on a.ts = b.ts order by a.col1, b.col1; +sql_error select a.col1, b.col1 from sta a right window join sta b on a.ts = b.ts window_offset(-1s, 1s) order by a.col1, b.col1; +sql select a.col1, b.col1 from sta a right window join sta b window_offset(-1s, 1s) order by a.col1, b.col1; +if $rows != 28 then + return -1 +endi +sql select a.col1, b.col1 from sta a right window join sta b window_offset(-1s, 1s) jlimit 2 order by a.col1, b.col1; +if $rows != 16 then + return -1 +endi +sql select a.col1, b.col1 from sta a right window join sta b window_offset(1s, 1s) order by a.col1, b.col1; +if $rows != 9 then + return -1 +endi +sql select a.col1, b.col1 from sta a right window join sta b window_offset(-1s, 1s) where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1; +if $rows != 6 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data10 != 1 then + return -1 +endi +if $data11 != 2 then + return -1 +endi +if $data20 != 2 then + return -1 +endi +if $data21 != 1 then + return -1 +endi +if $data30 != 2 then + return -1 +endi +if $data31 != 2 then + return -1 +endi +if $data40 != 3 then + return -1 +endi +if $data41 != 1 then + return -1 +endi +if $data50 != 3 then + return -1 +endi +if $data51 != 2 then + return -1 +endi + +sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(-1s, 1s) +if $rows != 7 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data41 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data61 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(-1s, 1s) jlimit 1; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(-1a, 1a) jlimit 1; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != NULL then + return -1 +endi +if $data11 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != NULL then + return -1 +endi +if $data31 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(-1h, 1h); +if $rows != 16 then + return -1 +endi +sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(1h, -1h); +if $rows != 16 then + return -1 +endi + +sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(1a, -1h); +if $rows != 9 then + return -1 +endi