From 9b008716fe919a24d1fcb353dccf87f0ac17d609 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 2 Feb 2024 16:06:01 +0800 Subject: [PATCH] enh: add window join ut --- source/libs/executor/test/joinTests.cpp | 277 +++++++++++++++++++++-- source/libs/parser/src/parTranslater.c | 7 + tests/script/tsim/join/left_win_join.sim | 9 + 3 files changed, 274 insertions(+), 19 deletions(-) diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index f8263fc12c..c7f1a39fd6 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -78,6 +78,7 @@ enum { #define ALL_TABLE_COLS (LEFT_TABLE_COLS | RIGHT_TABLE_COLS) #define JT_MAX_JLIMIT 20 +#define JT_MAX_WINDOW_OFFSET 5 #define JT_KEY_SOLT_ID (MAX_SLOT_NUM - 1) #define JT_PRIM_TS_SLOT_ID 0 int32_t jtInputColType[MAX_SLOT_NUM] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_BIGINT}; @@ -116,6 +117,8 @@ typedef struct { int32_t subType; int32_t asofOpType; int64_t jLimit; + int64_t winStartOffset; + int64_t winEndOffset; int32_t leftTotalRows; int32_t rightTotalRows; @@ -796,9 +799,31 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param limitNode->limit = param->jLimit; p->pJLimit = (SNode*)limitNode; } - p->leftPrimSlotId = 0; - p->rightPrimSlotId = 0; + p->leftPrimSlotId = JT_PRIM_TS_SLOT_ID; + p->rightPrimSlotId = JT_PRIM_TS_SLOT_ID; p->node.inputTsOrder = param->asc ? ORDER_ASC : ORDER_DESC; + if (JOIN_STYPE_WIN == p->subType) { + SWindowOffsetNode* pOffset = (SWindowOffsetNode*)nodesMakeNode(QUERY_NODE_WINDOW_OFFSET); + SValueNode* pStart = nodesMakeNode(QUERY_NODE_VALUE); + SValueNode* pEnd = nodesMakeNode(QUERY_NODE_VALUE); + pStart->node.resType.type = TSDB_DATA_TYPE_BIGINT; + pStart->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + pStart->datum.i = (taosRand() % 2) ? (-1 * taosRand() % JT_MAX_WINDOW_OFFSET) : (taosRand() % JT_MAX_WINDOW_OFFSET); + pEnd->node.resType.type = TSDB_DATA_TYPE_BIGINT; + pEnd->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + pEnd->datum.i = (taosRand() % 2) ? (-1 * taosRand() % JT_MAX_WINDOW_OFFSET) : (taosRand() % JT_MAX_WINDOW_OFFSET); + pOffset->pStartOffset = (SNode*)pStart; + pOffset->pEndOffset = (SNode*)pEnd; + p->pWindowOffset = (SNode*)pOffset; + + if (pStart->datum.i <= pEnd->datum.i) { + jtCtx.winStartOffset = pStart->datum.i; + jtCtx.winEndOffset = pEnd->datum.i; + } else { + jtCtx.winStartOffset = pEnd->datum.i; + jtCtx.winEndOffset = pStart->datum.i; + } + } jtCtx.joinType = param->joinType; jtCtx.subType = param->subType; @@ -883,7 +908,7 @@ void appendAsofLeftEachResGrps(char* leftInRow, int32_t rightOffset, int32_t rig } } -void appendAsofLeftNonMatchGrp(char* leftInRow) { +void appendLeftNonMatchGrp(char* leftInRow) { memset(jtCtx.resColBuf, 0, jtCtx.resColSize); for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { if (!jtCtx.resColList[c]) { @@ -913,7 +938,7 @@ void appendAllAsofResRows() { if (0 == jtCtx.rightFilterNum) { for (int32_t i = 0; i < leftRows; ++i) { char* leftInRow = (char*)taosArrayGet(jtCtx.leftRowsList, i); - appendAsofLeftNonMatchGrp(leftInRow); + appendLeftNonMatchGrp(leftInRow); } } } else { @@ -967,7 +992,7 @@ void chkAppendAsofGreaterResRows(bool forceOut) { } if (0 == jtCtx.rightFilterNum) { - appendAsofLeftNonMatchGrp(leftRow); + appendLeftNonMatchGrp(leftRow); } } } @@ -977,6 +1002,117 @@ void chkAppendAsofGreaterResRows(bool forceOut) { taosArrayPopFrontBatch(jtCtx.leftRowsList, i); } + +void appendWinEachResGrps(char* leftInRow, int32_t rightOffset, int32_t rightRows) { + if (rightOffset < 0) { + appendLeftNonMatchGrp(leftInRow); + return; + } + + memset(jtCtx.resColBuf, 0, jtCtx.resColSize); + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + if (!jtCtx.resColList[c]) { + continue; + } + + if (*((bool*)leftInRow + c)) { + *(char*)(jtCtx.resColBuf + c) = true; + } else { + memcpy(jtCtx.resColBuf + jtCtx.resColOffset[c], leftInRow + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes); + } + } + + int32_t endIdx = rightRows + rightOffset; + for (int32_t r = rightOffset; r < endIdx; ++r) { + bool* rightFilterOut = (bool*)taosArrayGet(jtCtx.rightFilterOut, r); + if (*rightFilterOut) { + continue; + } + + char* rightResRows = (char*)taosArrayGet(jtCtx.rightRowsList, r); + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + if (jtCtx.resColList[MAX_SLOT_NUM + c]) { + if (*(bool*)(rightResRows + c)) { + *(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = true; + memset(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], 0, tDataTypes[jtInputColType[c]].bytes); + } else { + *(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = false; + memcpy(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], rightResRows + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes); + } + } + } + + pushResRow(jtCtx.resColBuf, jtCtx.resColSize); + } +} + +void chkAppendWinResRows(bool forceOut) { + int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList); + if (rightRows < jtCtx.jLimit && !forceOut) { + return; + } + + int32_t rightRemains = rightRows; + int32_t rightOffset = 0; + int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList); + int32_t i = 0; + for (; i < leftRows; ++i) { + char* leftRow = (char*)taosArrayGet(jtCtx.leftRowsList, i); + int64_t* leftTs = (int64_t*)(leftRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]); + int64_t winStart = *leftTs + jtCtx.winStartOffset; + int64_t winEnd = *leftTs + jtCtx.winEndOffset; + int32_t winBeginIdx = -1; + bool append = false; + bool winClosed = false; + for (int32_t r = rightOffset; r < rightRows; ++r) { + char* rightRow = (char*)taosArrayGet(jtCtx.rightRowsList, r); + int64_t* rightTs = (int64_t*)(rightRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]); + if (*rightTs < winStart) { + rightOffset++; + rightRemains--; + if (rightRemains < jtCtx.jLimit && !forceOut) { + taosArrayPopFrontBatch(jtCtx.rightRowsList, rightOffset); + taosArrayPopFrontBatch(jtCtx.rightFilterOut, rightOffset); + taosArrayPopFrontBatch(jtCtx.leftRowsList, i); + return; + } + + continue; + } else if (*rightTs > winEnd) { + winClosed = true; + appendWinEachResGrps(leftRow, winBeginIdx, r - winBeginIdx); + append = true; + break; + } + + if (-1 == winBeginIdx) { + winBeginIdx = r; + } + + if ((r - winBeginIdx + 1) >= jtCtx.jLimit) { + appendWinEachResGrps(leftRow, winBeginIdx, jtCtx.jLimit); + append = true; + break; + } + } + + if (!append) { + if (!forceOut) { + break; + } + + if (0 == jtCtx.rightFilterNum) { + appendLeftNonMatchGrp(leftRow); + } + } + } + + taosArrayPopFrontBatch(jtCtx.rightRowsList, rightOffset); + taosArrayPopFrontBatch(jtCtx.rightFilterOut, rightOffset); + taosArrayPopFrontBatch(jtCtx.leftRowsList, i); +} + + void trimForAsofJlimit() { int32_t rowNum = taosArrayGetSize(jtCtx.rightRowsList); if (rowNum <= jtCtx.jLimit) { @@ -1020,6 +1156,9 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { if (JOIN_STYPE_ASOF == jtCtx.subType) { keepInput = jtCtx.asofOpType != OP_TYPE_EQUAL ? true : (blkId == LEFT_BLK_ID); pTableRows = (blkId == LEFT_BLK_ID) ? jtCtx.leftRowsList : jtCtx.rightRowsList; + } else if (JOIN_STYPE_WIN == jtCtx.subType) { + keepInput = true; + pTableRows = (blkId == LEFT_BLK_ID) ? jtCtx.leftRowsList : jtCtx.rightRowsList; } int32_t filterNum = (blkId == LEFT_BLK_ID) ? jtCtx.leftFilterNum : jtCtx.rightFilterNum; @@ -1044,7 +1183,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { taosArrayPush((blkId == LEFT_BLK_ID) ? jtCtx.leftBlkList : jtCtx.rightBlkList, ppBlk); } - filterOut = (peerFilterNum > 0 && jtCtx.subType != JOIN_STYPE_ASOF) ? true : false; + filterOut = (peerFilterNum > 0 && (jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN) ? true : false; if (!filterOut) { memset(jtCtx.resColBuf, 0, jtCtx.resColSize); if (keepInput) { @@ -1135,14 +1274,18 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { } if (keepInput) { - if (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN || jtCtx.asofOpType == OP_TYPE_EQUAL) { - if (blkId == LEFT_BLK_ID) { - appendAllAsofResRows(); + if (JOIN_STYPE_ASOF == jtCtx.subType) { + if (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN || jtCtx.asofOpType == OP_TYPE_EQUAL) { + if (blkId == LEFT_BLK_ID) { + appendAllAsofResRows(); + } else { + trimForAsofJlimit(); + } } else { - trimForAsofJlimit(); + chkAppendAsofGreaterResRows(false); } } else { - chkAppendAsofGreaterResRows(false); + chkAppendWinResRows(false); } } @@ -1749,7 +1892,7 @@ void antiJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { void addAsofEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) { bool filterOut = false; - void* cvalue = NULL, *filterValue = NULL; + void* cvalue = NULL; int64_t cbig = 0, fbig = 0; int32_t filterNum = leftTable ? jtCtx.leftFilterNum : jtCtx.rightFilterNum; int32_t* filterCol = leftTable ? jtCtx.leftFilterColList : jtCtx.rightFilterColList; @@ -1805,9 +1948,6 @@ void addAsofEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) { } void asofJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { - bool filterOut = false; - void* lValue = NULL, *rValue = NULL, *filterValue = NULL; - int64_t lBig = 0, rBig = 0, fbig = 0; int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows; switch (jtCtx.asofOpType) { @@ -1844,6 +1984,66 @@ void asofJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { } +void addWinEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) { + bool filterOut = false; + void* cvalue = NULL; + int64_t cbig = 0, fbig = 0; + int32_t filterNum = leftTable ? jtCtx.leftFilterNum : jtCtx.rightFilterNum; + int32_t* filterCol = leftTable ? jtCtx.leftFilterColList : jtCtx.rightFilterColList; + SArray* rowList = leftTable ? jtCtx.leftRowsList : jtCtx.rightRowsList; + + for (int32_t l = 0; l < rowsNum; ++l) { + char* row = jtCtx.colRowDataBuf + tbOffset + jtCtx.blkRowSize * l; + + filterOut = false; + + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + cvalue = row + jtCtx.colRowOffset[c]; + switch (jtInputColType[c]) { + case TSDB_DATA_TYPE_TIMESTAMP: + fbig = TIMESTAMP_FILTER_VALUE; + cbig = *(int64_t*)cvalue; + break; + case TSDB_DATA_TYPE_INT: + fbig = INT_FILTER_VALUE; + cbig = *(int32_t*)cvalue; + break; + case TSDB_DATA_TYPE_BIGINT: + fbig = BIGINT_FILTER_VALUE; + cbig = *(int64_t*)cvalue; + break; + default: + break; + } + + if (filterNum && filterCol[c] && ((*(bool*)(row + c)) || cbig <= fbig)) { + filterOut = true; + break; + } + } + + if (filterOut && leftTable) { + continue; + } + + taosArrayPush(rowList, row); + if (!leftTable) { + taosArrayPush(jtCtx.rightFilterOut, &filterOut); + } + } +} + + +void winJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { + int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows; + + addWinEqInRows(leftGrpRows, 0, true); + addWinEqInRows(rightGrpRows, rightTbOffset, false); + chkAppendWinResRows(false); +} + + + void fullJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { bool leftMatch = false, rightMatch = false, lfilterOut = false, rfilterOut = false; void* lValue = NULL, *rValue = NULL, *filterValue = NULL; @@ -1979,6 +2179,9 @@ void appendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { case JOIN_STYPE_ASOF: asofJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); break; + case JOIN_STYPE_WIN: + winJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); + break; default: break; } @@ -2083,6 +2286,8 @@ void createBothBlkRowsData(void) { if (JOIN_STYPE_ASOF == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) { ASSERT(OP_TYPE_LOWER_EQUAL == jtCtx.asofOpType || OP_TYPE_LOWER_THAN == jtCtx.asofOpType); chkAppendAsofGreaterResRows(true); + } else if (JOIN_STYPE_WIN == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) { + chkAppendWinResRows(true); } } @@ -2284,6 +2489,8 @@ void printBasicInfo(char* caseName) { if (JOIN_STYPE_ASOF == jtCtx.subType) { printf("\t asofOp:%s\n\t JLimit:%" PRId64 "\n", getAsofOpStr(), jtCtx.jLimit); + } else if (JOIN_STYPE_WIN == jtCtx.subType) { + printf("\t windowOffset:[%" PRId64 ", %" PRId64 "]\n\t JLimit:%" PRId64 "\n", jtCtx.winStartOffset, jtCtx.winEndOffset, jtCtx.jLimit); } printf("Input Info:\n\t totalBlk:left-%d right-%d\n\t totalRows:left-%d right-%d\n\t " @@ -2545,7 +2752,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { bool contLoop = true; SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param); - createDummyBlkList(200, 200, 200, 200, 10); + createDummyBlkList(10, 10, 10, 10, 3); while (contLoop) { rerunBlockedHere(); @@ -3091,8 +3298,8 @@ TEST(leftAntiJoin, fullCondTest) { #endif #endif -#if 1 #if 0 +#if 1 TEST(leftAsofJoin, noCondGreaterThanTest) { SJoinTestParam param; char* caseName = "leftAsofJoin:noCondGreaterThanTest"; @@ -3120,7 +3327,7 @@ TEST(leftAsofJoin, noCondGreaterThanTest) { } #endif -#if 0 +#if 1 TEST(leftAsofJoin, noCondGreaterEqTest) { SJoinTestParam param; char* caseName = "leftAsofJoin:noCondGreaterEqTest"; @@ -3148,7 +3355,7 @@ TEST(leftAsofJoin, noCondGreaterEqTest) { } #endif -#if 0 +#if 1 TEST(leftAsofJoin, noCondEqTest) { SJoinTestParam param; char* caseName = "leftAsofJoin:noCondEqTest"; @@ -3236,6 +3443,38 @@ TEST(leftAsofJoin, noCondLowerEqTest) { #endif +#if 1 +#if 1 +TEST(leftWinJoin, noCondProjectionTest) { + SJoinTestParam param; + char* caseName = "leftWinJoin:noCondProjectionTest"; + SExecTaskInfo* pTask = createDummyTaskInfo(caseName); + + param.pTask = pTask; + param.joinType = JOIN_TYPE_LEFT; + param.subType = JOIN_STYPE_WIN; + param.cond = TEST_NO_COND; + param.asc = true; + + for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; + + param.filter = false; + runSingleTest(caseName, ¶m); + + param.filter = true; + runSingleTest(caseName, ¶m); + } + + printStatInfo(caseName); + taosMemoryFree(pTask); +} +#endif + + +#endif + + int main(int argc, char** argv) { taosSeedRand(taosGetTimestampSec()); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 06f827c985..ce589f326d 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3268,6 +3268,13 @@ static int32_t translateJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoin return buildInvalidOperationMsg(&pCxt->msgBuf, "WINDOW_OFFSET only supported for WINDOW join"); } code = translateExpr(pCxt, &pJoinTable->pWindowOffset); + if (TSDB_CODE_SUCCESS == code) { + SValueNode* pStart = (SValueNode*)((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pStartOffset; + SValueNode* pEnd = (SValueNode*)((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pEndOffset; + if (pStart->datum.i > pEnd->datum.i) { + TSWAP(((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pStartOffset, ((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pEndOffset); + } + } } else if (*pSType == JOIN_STYPE_WIN) { return buildInvalidOperationMsg(&pCxt->msgBuf, "WINDOW_OFFSET required for WINDOW join"); } diff --git a/tests/script/tsim/join/left_win_join.sim b/tests/script/tsim/join/left_win_join.sim index 0ce5592d55..934a860f7e 100644 --- a/tests/script/tsim/join/left_win_join.sim +++ b/tests/script/tsim/join/left_win_join.sim @@ -166,3 +166,12 @@ sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(-1h, 1h) if $rows != 16 then return -1 endi +sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(1h, -1h); +if $rows != 16 then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(1a, -1h); +if $rows != 9 then + return -1 +endi