enh: add window join ut

This commit is contained in:
dapan1121 2024-02-20 17:13:29 -08:00
parent b26716f2e8
commit c4dbae3a5a
7 changed files with 236 additions and 36 deletions

View File

@ -628,17 +628,22 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
if (pJoinNode->pPrimKeyCond || pJoinNode->pFullOnCond) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT);
QRY_ERR_RET(
nodesNodeToSQL(pJoinNode->pPrimKeyCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
if (pJoinNode->pPrimKeyCond) {
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pPrimKeyCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
}
if (pJoinNode->pFullOnCond) {
if (pJoinNode->pPrimKeyCond) {
EXPLAIN_ROW_APPEND(" AND ");
}
QRY_ERR_RET(
nodesNodeToSQL(pJoinNode->pFullOnCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {

View File

@ -19,7 +19,7 @@
extern "C" {
#endif
#if 1
#if 0
#define MJOIN_DEFAULT_BLK_ROWS_NUM 2 //4096
#define MJOIN_HJOIN_CART_THRESHOLD 10
#define MJOIN_BLK_SIZE_LIMIT 0 //10485760

View File

@ -2696,11 +2696,14 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) {
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCol->srcSlot);
if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) < pCtx->winBeginTs) {
blockDataDestroy(pGrp->blk);
pCache->rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx);
if (pGrp->blk == pCache->outBlk) {
blockDataCleanup(pGrp->blk);
}
taosArrayPopFrontBatch(pCache->grps, 1);
grpNum--;
i--;
pCache->rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx);
continue;
}
@ -2712,7 +2715,12 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) {
if (*((int64_t*)pCol->pData + pGrp->beginIdx) <= pCtx->winEndTs) {
pGrp->readIdx = pGrp->beginIdx;
if (pGrp->endIdx < pGrp->beginIdx) {
pGrp->endIdx = pGrp->beginIdx;
pCache->rowNum = 1;
} else {
pCache->rowNum -= (pGrp->beginIdx - startIdx);
}
return TSDB_CODE_SUCCESS;
}
@ -2772,7 +2780,7 @@ _return:
int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) {
SMJoinWinCache* pCache = &pCtx->cache;
int32_t grpNum = taosArrayGetSize(pCache->grps);
if (grpNum <= 0) {
if (grpNum <= 0 || pCache->rowNum >= pCtx->jLimit) {
return TSDB_CODE_SUCCESS;
}
@ -2790,9 +2798,13 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) {
}
} else {
int32_t startIdx = pGrp->endIdx;
for (; pGrp->endIdx < pGrp->blk->info.rows && pCache->rowNum < pCtx->jLimit; pGrp->endIdx++) {
for (; pCache->rowNum < pCtx->jLimit && ++pGrp->endIdx < pGrp->blk->info.rows; ) {
if (*((int64_t*)pCol->pData + pGrp->endIdx) <= pCtx->winEndTs) {
pCache->rowNum++;
if ((pGrp->endIdx + 1) >= pGrp->blk->info.rows) {
break;
}
continue;
}
@ -2998,6 +3010,7 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
if (JOIN_STYPE_ASOF == pJoinNode->subType || JOIN_STYPE_WIN == pJoinNode->subType) {
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
pJoin->subType = JOIN_STYPE_OUTER;
pJoin->build->eqRowLimit = pCtx->jLimit;
} else {
pCtx->jLimit = -1;
}

View File

@ -66,7 +66,7 @@ enum {
};
#define COL_DISPLAY_WIDTH 18
#define JT_MAX_LOOP 50000
#define JT_MAX_LOOP 10000
#define LEFT_BLK_ID 0
#define RIGHT_BLK_ID 1
@ -199,7 +199,7 @@ typedef struct {
SJoinTestCtx jtCtx = {0};
SJoinTestCtrl jtCtrl = {1, 1, 1, 0};
SJoinTestCtrl jtCtrl = {0, 0, 0, 0};
SJoinTestStat jtStat = {0};
SJoinTestResInfo jtRes = {0};
@ -794,35 +794,34 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param
p->joinType = param->joinType;
p->subType = param->subType;
p->asofOpType = param->asofOp;
if (param->jLimit > 1 || taosRand() % 2) {
if (p->subType == JOIN_STYPE_WIN || param->jLimit > 1 || taosRand() % 2) {
SLimitNode* limitNode = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT);
limitNode->limit = param->jLimit;
p->pJLimit = (SNode*)limitNode;
}
p->leftPrimSlotId = JT_PRIM_TS_SLOT_ID;
p->rightPrimSlotId = JT_PRIM_TS_SLOT_ID;
p->node.inputTsOrder = param->asc ? ORDER_ASC : ORDER_DESC;
if (JOIN_STYPE_WIN == p->subType) {
SWindowOffsetNode* pOffset = (SWindowOffsetNode*)nodesMakeNode(QUERY_NODE_WINDOW_OFFSET);
SValueNode* pStart = nodesMakeNode(QUERY_NODE_VALUE);
SValueNode* pEnd = nodesMakeNode(QUERY_NODE_VALUE);
SValueNode* pStart = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
SValueNode* pEnd = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
pStart->node.resType.type = TSDB_DATA_TYPE_BIGINT;
pStart->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
pStart->datum.i = (taosRand() % 2) ? (-1 * taosRand() % JT_MAX_WINDOW_OFFSET) : (taosRand() % JT_MAX_WINDOW_OFFSET);
pStart->datum.i = (taosRand() % 2) ? (((int32_t)-1) * (int64_t)(taosRand() % JT_MAX_WINDOW_OFFSET)) : (taosRand() % JT_MAX_WINDOW_OFFSET);
pEnd->node.resType.type = TSDB_DATA_TYPE_BIGINT;
pEnd->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
pEnd->datum.i = (taosRand() % 2) ? (-1 * taosRand() % JT_MAX_WINDOW_OFFSET) : (taosRand() % JT_MAX_WINDOW_OFFSET);
pEnd->datum.i = (taosRand() % 2) ? (((int32_t)-1) * (int64_t)(taosRand() % JT_MAX_WINDOW_OFFSET)) : (taosRand() % JT_MAX_WINDOW_OFFSET);
if (pStart->datum.i > pEnd->datum.i) {
TSWAP(pStart->datum.i, pEnd->datum.i);
}
pOffset->pStartOffset = (SNode*)pStart;
pOffset->pEndOffset = (SNode*)pEnd;
p->pWindowOffset = (SNode*)pOffset;
if (pStart->datum.i <= pEnd->datum.i) {
jtCtx.winStartOffset = pStart->datum.i;
jtCtx.winEndOffset = pEnd->datum.i;
} else {
jtCtx.winStartOffset = pEnd->datum.i;
jtCtx.winEndOffset = pStart->datum.i;
}
}
jtCtx.joinType = param->joinType;
@ -1005,7 +1004,9 @@ void chkAppendAsofGreaterResRows(bool forceOut) {
void appendWinEachResGrps(char* leftInRow, int32_t rightOffset, int32_t rightRows) {
if (rightOffset < 0) {
if (0 == jtCtx.rightFilterNum) {
appendLeftNonMatchGrp(leftInRow);
}
return;
}
@ -1101,7 +1102,9 @@ void chkAppendWinResRows(bool forceOut) {
break;
}
if (0 == jtCtx.rightFilterNum) {
if (winBeginIdx >= 0) {
appendWinEachResGrps(leftRow, winBeginIdx, rightRows - winBeginIdx);
} else if (0 == jtCtx.rightFilterNum) {
appendLeftNonMatchGrp(leftRow);
}
}
@ -1142,12 +1145,12 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
bool keepRes = false;
bool keepInput = false;
if (blkId == LEFT_BLK_ID) {
if ((jtCtx.joinType == JOIN_TYPE_LEFT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF)) {
if ((jtCtx.joinType == JOIN_TYPE_LEFT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN)) {
keepRes = true;
}
peerOffset = MAX_SLOT_NUM;
} else {
if ((jtCtx.joinType == JOIN_TYPE_RIGHT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF)) {
if ((jtCtx.joinType == JOIN_TYPE_RIGHT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN)) {
keepRes = true;
}
tableOffset = MAX_SLOT_NUM;
@ -1183,7 +1186,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
taosArrayPush((blkId == LEFT_BLK_ID) ? jtCtx.leftBlkList : jtCtx.rightBlkList, ppBlk);
}
filterOut = (peerFilterNum > 0 && (jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN) ? true : false;
filterOut = (peerFilterNum > 0 && (jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN)) ? true : false;
if (!filterOut) {
memset(jtCtx.resColBuf, 0, jtCtx.resColSize);
if (keepInput) {
@ -2752,7 +2755,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) {
bool contLoop = true;
SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param);
createDummyBlkList(10, 10, 10, 10, 3);
createDummyBlkList(200, 200, 200, 200, 20);
while (contLoop) {
rerunBlockedHere();

View File

@ -4055,7 +4055,7 @@ static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) {
}
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2
if (JOIN_STYPE_NONE != pJoin->subType || pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2
|| pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN || pJoin->isLowLevelJoin) {
if (pJoin->joinAlgo == JOIN_ALGO_UNKNOWN) {
pJoin->joinAlgo = JOIN_ALGO_MERGE;

View File

@ -68,6 +68,7 @@ run tsim/join/right_anti_join.sim
run tsim/join/left_asof_join.sim
run tsim/join/right_asof_join.sim
run tsim/join/left_win_join.sim
run tsim/join/right_win_join.sim
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
@ -85,5 +86,6 @@ run tsim/join/right_anti_join.sim
run tsim/join/left_asof_join.sim
run tsim/join/right_asof_join.sim
run tsim/join/left_win_join.sim
run tsim/join/right_win_join.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,177 @@
sql connect
sql use test0;
sql_error select a.col1, b.col1 from sta a right window join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' window_offset(-1s, 1s) order by a.col1, b.col1;
sql_error select a.col1, b.col1 from sta a right window join sta b on a.ts = b.ts order by a.col1, b.col1;
sql_error select a.col1, b.col1 from sta a right window join sta b on a.ts = b.ts window_offset(-1s, 1s) order by a.col1, b.col1;
sql select a.col1, b.col1 from sta a right window join sta b window_offset(-1s, 1s) order by a.col1, b.col1;
if $rows != 28 then
return -1
endi
sql select a.col1, b.col1 from sta a right window join sta b window_offset(-1s, 1s) jlimit 2 order by a.col1, b.col1;
if $rows != 16 then
return -1
endi
sql select a.col1, b.col1 from sta a right window join sta b window_offset(1s, 1s) order by a.col1, b.col1;
if $rows != 9 then
return -1
endi
sql select a.col1, b.col1 from sta a right window join sta b window_offset(-1s, 1s) where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1;
if $rows != 6 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data10 != 1 then
return -1
endi
if $data11 != 2 then
return -1
endi
if $data20 != 2 then
return -1
endi
if $data21 != 1 then
return -1
endi
if $data30 != 2 then
return -1
endi
if $data31 != 2 then
return -1
endi
if $data40 != 3 then
return -1
endi
if $data41 != 1 then
return -1
endi
if $data50 != 3 then
return -1
endi
if $data51 != 2 then
return -1
endi
sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(-1s, 1s)
if $rows != 7 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data41 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data50 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data51 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data60 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data61 != @23-11-17 16:29:05.000@ then
return -1
endi
sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(-1s, 1s) jlimit 1;
if $rows != 4 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:05.000@ then
return -1
endi
sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(-1a, 1a) jlimit 1;
if $rows != 4 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != NULL then
return -1
endi
if $data11 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data30 != NULL then
return -1
endi
if $data31 != @23-11-17 16:29:05.000@ then
return -1
endi
sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(-1h, 1h);
if $rows != 16 then
return -1
endi
sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(1h, -1h);
if $rows != 16 then
return -1
endi
sql select a.ts, b.ts from tba1 a right window join tba2 b window_offset(1a, -1h);
if $rows != 9 then
return -1
endi