enh: add window join ut
This commit is contained in:
parent
ab87953362
commit
9b008716fe
|
@ -78,6 +78,7 @@ enum {
|
|||
#define ALL_TABLE_COLS (LEFT_TABLE_COLS | RIGHT_TABLE_COLS)
|
||||
|
||||
#define JT_MAX_JLIMIT 20
|
||||
#define JT_MAX_WINDOW_OFFSET 5
|
||||
#define JT_KEY_SOLT_ID (MAX_SLOT_NUM - 1)
|
||||
#define JT_PRIM_TS_SLOT_ID 0
|
||||
int32_t jtInputColType[MAX_SLOT_NUM] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_BIGINT};
|
||||
|
@ -116,6 +117,8 @@ typedef struct {
|
|||
int32_t subType;
|
||||
int32_t asofOpType;
|
||||
int64_t jLimit;
|
||||
int64_t winStartOffset;
|
||||
int64_t winEndOffset;
|
||||
|
||||
int32_t leftTotalRows;
|
||||
int32_t rightTotalRows;
|
||||
|
@ -796,9 +799,31 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param
|
|||
limitNode->limit = param->jLimit;
|
||||
p->pJLimit = (SNode*)limitNode;
|
||||
}
|
||||
p->leftPrimSlotId = 0;
|
||||
p->rightPrimSlotId = 0;
|
||||
p->leftPrimSlotId = JT_PRIM_TS_SLOT_ID;
|
||||
p->rightPrimSlotId = JT_PRIM_TS_SLOT_ID;
|
||||
p->node.inputTsOrder = param->asc ? ORDER_ASC : ORDER_DESC;
|
||||
if (JOIN_STYPE_WIN == p->subType) {
|
||||
SWindowOffsetNode* pOffset = (SWindowOffsetNode*)nodesMakeNode(QUERY_NODE_WINDOW_OFFSET);
|
||||
SValueNode* pStart = nodesMakeNode(QUERY_NODE_VALUE);
|
||||
SValueNode* pEnd = nodesMakeNode(QUERY_NODE_VALUE);
|
||||
pStart->node.resType.type = TSDB_DATA_TYPE_BIGINT;
|
||||
pStart->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
|
||||
pStart->datum.i = (taosRand() % 2) ? (-1 * taosRand() % JT_MAX_WINDOW_OFFSET) : (taosRand() % JT_MAX_WINDOW_OFFSET);
|
||||
pEnd->node.resType.type = TSDB_DATA_TYPE_BIGINT;
|
||||
pEnd->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
|
||||
pEnd->datum.i = (taosRand() % 2) ? (-1 * taosRand() % JT_MAX_WINDOW_OFFSET) : (taosRand() % JT_MAX_WINDOW_OFFSET);
|
||||
pOffset->pStartOffset = (SNode*)pStart;
|
||||
pOffset->pEndOffset = (SNode*)pEnd;
|
||||
p->pWindowOffset = (SNode*)pOffset;
|
||||
|
||||
if (pStart->datum.i <= pEnd->datum.i) {
|
||||
jtCtx.winStartOffset = pStart->datum.i;
|
||||
jtCtx.winEndOffset = pEnd->datum.i;
|
||||
} else {
|
||||
jtCtx.winStartOffset = pEnd->datum.i;
|
||||
jtCtx.winEndOffset = pStart->datum.i;
|
||||
}
|
||||
}
|
||||
|
||||
jtCtx.joinType = param->joinType;
|
||||
jtCtx.subType = param->subType;
|
||||
|
@ -883,7 +908,7 @@ void appendAsofLeftEachResGrps(char* leftInRow, int32_t rightOffset, int32_t rig
|
|||
}
|
||||
}
|
||||
|
||||
void appendAsofLeftNonMatchGrp(char* leftInRow) {
|
||||
void appendLeftNonMatchGrp(char* leftInRow) {
|
||||
memset(jtCtx.resColBuf, 0, jtCtx.resColSize);
|
||||
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
|
||||
if (!jtCtx.resColList[c]) {
|
||||
|
@ -913,7 +938,7 @@ void appendAllAsofResRows() {
|
|||
if (0 == jtCtx.rightFilterNum) {
|
||||
for (int32_t i = 0; i < leftRows; ++i) {
|
||||
char* leftInRow = (char*)taosArrayGet(jtCtx.leftRowsList, i);
|
||||
appendAsofLeftNonMatchGrp(leftInRow);
|
||||
appendLeftNonMatchGrp(leftInRow);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -967,7 +992,7 @@ void chkAppendAsofGreaterResRows(bool forceOut) {
|
|||
}
|
||||
|
||||
if (0 == jtCtx.rightFilterNum) {
|
||||
appendAsofLeftNonMatchGrp(leftRow);
|
||||
appendLeftNonMatchGrp(leftRow);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -977,6 +1002,117 @@ void chkAppendAsofGreaterResRows(bool forceOut) {
|
|||
taosArrayPopFrontBatch(jtCtx.leftRowsList, i);
|
||||
}
|
||||
|
||||
|
||||
void appendWinEachResGrps(char* leftInRow, int32_t rightOffset, int32_t rightRows) {
|
||||
if (rightOffset < 0) {
|
||||
appendLeftNonMatchGrp(leftInRow);
|
||||
return;
|
||||
}
|
||||
|
||||
memset(jtCtx.resColBuf, 0, jtCtx.resColSize);
|
||||
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
|
||||
if (!jtCtx.resColList[c]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (*((bool*)leftInRow + c)) {
|
||||
*(char*)(jtCtx.resColBuf + c) = true;
|
||||
} else {
|
||||
memcpy(jtCtx.resColBuf + jtCtx.resColOffset[c], leftInRow + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t endIdx = rightRows + rightOffset;
|
||||
for (int32_t r = rightOffset; r < endIdx; ++r) {
|
||||
bool* rightFilterOut = (bool*)taosArrayGet(jtCtx.rightFilterOut, r);
|
||||
if (*rightFilterOut) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* rightResRows = (char*)taosArrayGet(jtCtx.rightRowsList, r);
|
||||
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
|
||||
if (jtCtx.resColList[MAX_SLOT_NUM + c]) {
|
||||
if (*(bool*)(rightResRows + c)) {
|
||||
*(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = true;
|
||||
memset(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], 0, tDataTypes[jtInputColType[c]].bytes);
|
||||
} else {
|
||||
*(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = false;
|
||||
memcpy(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], rightResRows + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pushResRow(jtCtx.resColBuf, jtCtx.resColSize);
|
||||
}
|
||||
}
|
||||
|
||||
void chkAppendWinResRows(bool forceOut) {
|
||||
int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList);
|
||||
if (rightRows < jtCtx.jLimit && !forceOut) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t rightRemains = rightRows;
|
||||
int32_t rightOffset = 0;
|
||||
int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList);
|
||||
int32_t i = 0;
|
||||
for (; i < leftRows; ++i) {
|
||||
char* leftRow = (char*)taosArrayGet(jtCtx.leftRowsList, i);
|
||||
int64_t* leftTs = (int64_t*)(leftRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]);
|
||||
int64_t winStart = *leftTs + jtCtx.winStartOffset;
|
||||
int64_t winEnd = *leftTs + jtCtx.winEndOffset;
|
||||
int32_t winBeginIdx = -1;
|
||||
bool append = false;
|
||||
bool winClosed = false;
|
||||
for (int32_t r = rightOffset; r < rightRows; ++r) {
|
||||
char* rightRow = (char*)taosArrayGet(jtCtx.rightRowsList, r);
|
||||
int64_t* rightTs = (int64_t*)(rightRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]);
|
||||
if (*rightTs < winStart) {
|
||||
rightOffset++;
|
||||
rightRemains--;
|
||||
if (rightRemains < jtCtx.jLimit && !forceOut) {
|
||||
taosArrayPopFrontBatch(jtCtx.rightRowsList, rightOffset);
|
||||
taosArrayPopFrontBatch(jtCtx.rightFilterOut, rightOffset);
|
||||
taosArrayPopFrontBatch(jtCtx.leftRowsList, i);
|
||||
return;
|
||||
}
|
||||
|
||||
continue;
|
||||
} else if (*rightTs > winEnd) {
|
||||
winClosed = true;
|
||||
appendWinEachResGrps(leftRow, winBeginIdx, r - winBeginIdx);
|
||||
append = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (-1 == winBeginIdx) {
|
||||
winBeginIdx = r;
|
||||
}
|
||||
|
||||
if ((r - winBeginIdx + 1) >= jtCtx.jLimit) {
|
||||
appendWinEachResGrps(leftRow, winBeginIdx, jtCtx.jLimit);
|
||||
append = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!append) {
|
||||
if (!forceOut) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (0 == jtCtx.rightFilterNum) {
|
||||
appendLeftNonMatchGrp(leftRow);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPopFrontBatch(jtCtx.rightRowsList, rightOffset);
|
||||
taosArrayPopFrontBatch(jtCtx.rightFilterOut, rightOffset);
|
||||
taosArrayPopFrontBatch(jtCtx.leftRowsList, i);
|
||||
}
|
||||
|
||||
|
||||
void trimForAsofJlimit() {
|
||||
int32_t rowNum = taosArrayGetSize(jtCtx.rightRowsList);
|
||||
if (rowNum <= jtCtx.jLimit) {
|
||||
|
@ -1020,6 +1156,9 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
|
|||
if (JOIN_STYPE_ASOF == jtCtx.subType) {
|
||||
keepInput = jtCtx.asofOpType != OP_TYPE_EQUAL ? true : (blkId == LEFT_BLK_ID);
|
||||
pTableRows = (blkId == LEFT_BLK_ID) ? jtCtx.leftRowsList : jtCtx.rightRowsList;
|
||||
} else if (JOIN_STYPE_WIN == jtCtx.subType) {
|
||||
keepInput = true;
|
||||
pTableRows = (blkId == LEFT_BLK_ID) ? jtCtx.leftRowsList : jtCtx.rightRowsList;
|
||||
}
|
||||
|
||||
int32_t filterNum = (blkId == LEFT_BLK_ID) ? jtCtx.leftFilterNum : jtCtx.rightFilterNum;
|
||||
|
@ -1044,7 +1183,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
|
|||
taosArrayPush((blkId == LEFT_BLK_ID) ? jtCtx.leftBlkList : jtCtx.rightBlkList, ppBlk);
|
||||
}
|
||||
|
||||
filterOut = (peerFilterNum > 0 && jtCtx.subType != JOIN_STYPE_ASOF) ? true : false;
|
||||
filterOut = (peerFilterNum > 0 && (jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN) ? true : false;
|
||||
if (!filterOut) {
|
||||
memset(jtCtx.resColBuf, 0, jtCtx.resColSize);
|
||||
if (keepInput) {
|
||||
|
@ -1135,14 +1274,18 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
|
|||
}
|
||||
|
||||
if (keepInput) {
|
||||
if (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN || jtCtx.asofOpType == OP_TYPE_EQUAL) {
|
||||
if (blkId == LEFT_BLK_ID) {
|
||||
appendAllAsofResRows();
|
||||
if (JOIN_STYPE_ASOF == jtCtx.subType) {
|
||||
if (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN || jtCtx.asofOpType == OP_TYPE_EQUAL) {
|
||||
if (blkId == LEFT_BLK_ID) {
|
||||
appendAllAsofResRows();
|
||||
} else {
|
||||
trimForAsofJlimit();
|
||||
}
|
||||
} else {
|
||||
trimForAsofJlimit();
|
||||
chkAppendAsofGreaterResRows(false);
|
||||
}
|
||||
} else {
|
||||
chkAppendAsofGreaterResRows(false);
|
||||
chkAppendWinResRows(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1749,7 +1892,7 @@ void antiJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
|
|||
|
||||
void addAsofEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) {
|
||||
bool filterOut = false;
|
||||
void* cvalue = NULL, *filterValue = NULL;
|
||||
void* cvalue = NULL;
|
||||
int64_t cbig = 0, fbig = 0;
|
||||
int32_t filterNum = leftTable ? jtCtx.leftFilterNum : jtCtx.rightFilterNum;
|
||||
int32_t* filterCol = leftTable ? jtCtx.leftFilterColList : jtCtx.rightFilterColList;
|
||||
|
@ -1805,9 +1948,6 @@ void addAsofEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) {
|
|||
}
|
||||
|
||||
void asofJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
|
||||
bool filterOut = false;
|
||||
void* lValue = NULL, *rValue = NULL, *filterValue = NULL;
|
||||
int64_t lBig = 0, rBig = 0, fbig = 0;
|
||||
int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows;
|
||||
|
||||
switch (jtCtx.asofOpType) {
|
||||
|
@ -1844,6 +1984,66 @@ void asofJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
|
|||
}
|
||||
|
||||
|
||||
void addWinEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) {
|
||||
bool filterOut = false;
|
||||
void* cvalue = NULL;
|
||||
int64_t cbig = 0, fbig = 0;
|
||||
int32_t filterNum = leftTable ? jtCtx.leftFilterNum : jtCtx.rightFilterNum;
|
||||
int32_t* filterCol = leftTable ? jtCtx.leftFilterColList : jtCtx.rightFilterColList;
|
||||
SArray* rowList = leftTable ? jtCtx.leftRowsList : jtCtx.rightRowsList;
|
||||
|
||||
for (int32_t l = 0; l < rowsNum; ++l) {
|
||||
char* row = jtCtx.colRowDataBuf + tbOffset + jtCtx.blkRowSize * l;
|
||||
|
||||
filterOut = false;
|
||||
|
||||
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
|
||||
cvalue = row + jtCtx.colRowOffset[c];
|
||||
switch (jtInputColType[c]) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
fbig = TIMESTAMP_FILTER_VALUE;
|
||||
cbig = *(int64_t*)cvalue;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
fbig = INT_FILTER_VALUE;
|
||||
cbig = *(int32_t*)cvalue;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
fbig = BIGINT_FILTER_VALUE;
|
||||
cbig = *(int64_t*)cvalue;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (filterNum && filterCol[c] && ((*(bool*)(row + c)) || cbig <= fbig)) {
|
||||
filterOut = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (filterOut && leftTable) {
|
||||
continue;
|
||||
}
|
||||
|
||||
taosArrayPush(rowList, row);
|
||||
if (!leftTable) {
|
||||
taosArrayPush(jtCtx.rightFilterOut, &filterOut);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void winJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
|
||||
int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows;
|
||||
|
||||
addWinEqInRows(leftGrpRows, 0, true);
|
||||
addWinEqInRows(rightGrpRows, rightTbOffset, false);
|
||||
chkAppendWinResRows(false);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void fullJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
|
||||
bool leftMatch = false, rightMatch = false, lfilterOut = false, rfilterOut = false;
|
||||
void* lValue = NULL, *rValue = NULL, *filterValue = NULL;
|
||||
|
@ -1979,6 +2179,9 @@ void appendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
|
|||
case JOIN_STYPE_ASOF:
|
||||
asofJoinAppendEqGrpRes(leftGrpRows, rightGrpRows);
|
||||
break;
|
||||
case JOIN_STYPE_WIN:
|
||||
winJoinAppendEqGrpRes(leftGrpRows, rightGrpRows);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -2083,6 +2286,8 @@ void createBothBlkRowsData(void) {
|
|||
if (JOIN_STYPE_ASOF == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) {
|
||||
ASSERT(OP_TYPE_LOWER_EQUAL == jtCtx.asofOpType || OP_TYPE_LOWER_THAN == jtCtx.asofOpType);
|
||||
chkAppendAsofGreaterResRows(true);
|
||||
} else if (JOIN_STYPE_WIN == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) {
|
||||
chkAppendWinResRows(true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2284,6 +2489,8 @@ void printBasicInfo(char* caseName) {
|
|||
|
||||
if (JOIN_STYPE_ASOF == jtCtx.subType) {
|
||||
printf("\t asofOp:%s\n\t JLimit:%" PRId64 "\n", getAsofOpStr(), jtCtx.jLimit);
|
||||
} else if (JOIN_STYPE_WIN == jtCtx.subType) {
|
||||
printf("\t windowOffset:[%" PRId64 ", %" PRId64 "]\n\t JLimit:%" PRId64 "\n", jtCtx.winStartOffset, jtCtx.winEndOffset, jtCtx.jLimit);
|
||||
}
|
||||
|
||||
printf("Input Info:\n\t totalBlk:left-%d right-%d\n\t totalRows:left-%d right-%d\n\t "
|
||||
|
@ -2545,7 +2752,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) {
|
|||
bool contLoop = true;
|
||||
|
||||
SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param);
|
||||
createDummyBlkList(200, 200, 200, 200, 10);
|
||||
createDummyBlkList(10, 10, 10, 10, 3);
|
||||
|
||||
while (contLoop) {
|
||||
rerunBlockedHere();
|
||||
|
@ -3091,8 +3298,8 @@ TEST(leftAntiJoin, fullCondTest) {
|
|||
#endif
|
||||
#endif
|
||||
|
||||
#if 1
|
||||
#if 0
|
||||
#if 1
|
||||
TEST(leftAsofJoin, noCondGreaterThanTest) {
|
||||
SJoinTestParam param;
|
||||
char* caseName = "leftAsofJoin:noCondGreaterThanTest";
|
||||
|
@ -3120,7 +3327,7 @@ TEST(leftAsofJoin, noCondGreaterThanTest) {
|
|||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
#if 1
|
||||
TEST(leftAsofJoin, noCondGreaterEqTest) {
|
||||
SJoinTestParam param;
|
||||
char* caseName = "leftAsofJoin:noCondGreaterEqTest";
|
||||
|
@ -3148,7 +3355,7 @@ TEST(leftAsofJoin, noCondGreaterEqTest) {
|
|||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
#if 1
|
||||
TEST(leftAsofJoin, noCondEqTest) {
|
||||
SJoinTestParam param;
|
||||
char* caseName = "leftAsofJoin:noCondEqTest";
|
||||
|
@ -3236,6 +3443,38 @@ TEST(leftAsofJoin, noCondLowerEqTest) {
|
|||
#endif
|
||||
|
||||
|
||||
#if 1
|
||||
#if 1
|
||||
TEST(leftWinJoin, noCondProjectionTest) {
|
||||
SJoinTestParam param;
|
||||
char* caseName = "leftWinJoin:noCondProjectionTest";
|
||||
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
|
||||
|
||||
param.pTask = pTask;
|
||||
param.joinType = JOIN_TYPE_LEFT;
|
||||
param.subType = JOIN_STYPE_WIN;
|
||||
param.cond = TEST_NO_COND;
|
||||
param.asc = true;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
|
||||
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
||||
printStatInfo(caseName);
|
||||
taosMemoryFree(pTask);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
taosSeedRand(taosGetTimestampSec());
|
||||
|
|
|
@ -3268,6 +3268,13 @@ static int32_t translateJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoin
|
|||
return buildInvalidOperationMsg(&pCxt->msgBuf, "WINDOW_OFFSET only supported for WINDOW join");
|
||||
}
|
||||
code = translateExpr(pCxt, &pJoinTable->pWindowOffset);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SValueNode* pStart = (SValueNode*)((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pStartOffset;
|
||||
SValueNode* pEnd = (SValueNode*)((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pEndOffset;
|
||||
if (pStart->datum.i > pEnd->datum.i) {
|
||||
TSWAP(((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pStartOffset, ((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pEndOffset);
|
||||
}
|
||||
}
|
||||
} else if (*pSType == JOIN_STYPE_WIN) {
|
||||
return buildInvalidOperationMsg(&pCxt->msgBuf, "WINDOW_OFFSET required for WINDOW join");
|
||||
}
|
||||
|
|
|
@ -166,3 +166,12 @@ sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(-1h, 1h)
|
|||
if $rows != 16 then
|
||||
return -1
|
||||
endi
|
||||
sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(1h, -1h);
|
||||
if $rows != 16 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(1a, -1h);
|
||||
if $rows != 9 then
|
||||
return -1
|
||||
endi
|
||||
|
|
Loading…
Reference in New Issue