diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index e716f070b8..1cd904bc48 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -79,6 +79,7 @@ enum { #define JT_MAX_JLIMIT 3 #define JT_KEY_SOLT_ID (MAX_SLOT_NUM - 1) +#define JT_PRIM_TS_SLOT_ID 0 int32_t jtInputColType[MAX_SLOT_NUM] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_BIGINT}; char* jtColCondStr[] = {"", "NO COND", "EQ COND", "ON COND", "FULL COND"}; @@ -169,9 +170,10 @@ typedef struct { int32_t rightFinMatchNum; bool* rightFinMatch; - int64_t leftRowsNum; + int32_t inColOffset[MAX_SLOT_NUM]; + int32_t inColSize; + char* inColBuf; SArray* leftRowsList; - int64_t rightRowsNum; SArray* rightRowsList; SArray* rightFilterOut; } SJoinTestCtx; @@ -843,50 +845,130 @@ SSDataBlock* createDummyBlock(int32_t blkId) { return p; } +void appendAsofLeftEachResGrps(char* leftInRow, int32_t rightOffset, int32_t rightRows) { + memset(jtCtx.resColBuf, 0, jtCtx.resColSize); + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + if (!jtCtx.resColList[c]) { + continue; + } + + if (*((bool*)leftInRow + c)) { + *(char*)(jtCtx.resColBuf + c) = true; + } else { + memcpy(jtCtx.resColBuf + jtCtx.resColOffset[c], leftInRow + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes); + } + } + + for (int32_t r = rightOffset; r < rightRows; ++r) { + bool* rightFilterOut = taosArrayGet(jtCtx.rightFilterOut, r); + if (*rightFilterOut) { + continue; + } + + char* rightResRows = taosArrayGet(jtCtx.rightRowsList, r); + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + if (jtCtx.resColList[MAX_SLOT_NUM + c]) { + if (*(bool*)(rightResRows + MAX_SLOT_NUM + c)) { + *(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = true; + } else { + *(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = false; + memcpy(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], rightResRows + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes); + } + } + } + + pushResRow(jtCtx.resColBuf, jtCtx.resColSize); + } +} + +void appendAsofLeftNonMatchGrp(char* leftInRow) { + memset(jtCtx.resColBuf, 0, jtCtx.resColSize); + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + if (!jtCtx.resColList[c]) { + continue; + } + + if (*((bool*)leftInRow + c)) { + *(char*)(jtCtx.resColBuf + c) = true; + } else { + memcpy(jtCtx.resColBuf + jtCtx.resColOffset[c], leftInRow + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes); + } + } + + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + if (jtCtx.resColList[MAX_SLOT_NUM + c]) { + *(char*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = true; + } + } + + pushResRow(jtCtx.resColBuf, jtCtx.resColSize); +} + void appendAllAsofResRows() { int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList); int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList); if (rightRows <= 0) { for (int32_t i = 0; i < leftRows; ++i) { - char* leftResRows = taosArrayGet(jtCtx.leftRowsList, i); - for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { - if (jtCtx.resColList[MAX_SLOT_NUM + c]) { - *(char*)(leftResRows + MAX_SLOT_NUM + c) = true; - } - } - - pushResRow(leftResRows, jtCtx.resColSize); + char* leftInRow = taosArrayGet(jtCtx.leftRowsList, i); + appendAsofLeftNonMatchGrp(leftInRow); } } else { ASSERT(rightRows <= jtCtx.jLimit); for (int32_t i = 0; i < leftRows; ++i) { - char* leftResRows = taosArrayGet(jtCtx.leftRowsList, i); - for (int32_t r = 0; r < rightRows; ++r) { - char* rightResRows = taosArrayGet(jtCtx.rightRowsList, r); - for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { - if (jtCtx.resColList[MAX_SLOT_NUM + c]) { - if (*(bool*)(rightResRows + MAX_SLOT_NUM + c)) { - *(bool*)(leftResRows + MAX_SLOT_NUM + c) = true; - } else { - *(bool*)(leftResRows + MAX_SLOT_NUM + c) = false; - memcpy(leftResRows + jtCtx.resColOffset[MAX_SLOT_NUM + c], rightResRows + jtCtx.resColOffset[MAX_SLOT_NUM + c], tDataTypes[jtInputColType[c]].bytes); - } - } - } - - pushResRow(leftResRows, jtCtx.resColSize); - } + char* leftInRow = taosArrayGet(jtCtx.leftRowsList, i); + appendAsofLeftEachResGrps(leftInRow, 0, rightRows); } } + taosArrayClear(jtCtx.leftRowsList); } -void checkAppendAsofResRows(bool forceOut) { +void chkAppendAsofGreaterResRows(bool forceOut) { int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList); - if (rightRows <= 0 && !forceOut) { + if (rightRows < jtCtx.jLimit && !forceOut) { return; } - + int32_t rightRemains = rightRows; + int32_t rightOffset = 0; + int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList); + int32_t i = 0; + for (; i < leftRows; ++i) { + char* leftRow = taosArrayGet(jtCtx.leftRowsList, i); + int64_t* leftTs = (int64_t*)(leftRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]); + bool append = false; + for (int32_t r = rightOffset; r < rightRows; ++r) { + char* rightRow = taosArrayGet(jtCtx.rightRowsList, r); + int64_t* rightTs = (int64_t*)(rightRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]); + if ((*leftTs > *rightTs) || (*leftTs == *rightTs && OP_TYPE_LOWER_THAN == jtCtx.asofOpType)) { + rightOffset++; + rightRemains--; + if (rightRemains < jtCtx.jLimit && !forceOut) { + taosArrayPopFrontBatch(jtCtx.rightRowsList, rightOffset); + taosArrayPopFrontBatch(jtCtx.rightFilterOut, rightOffset); + taosArrayPopFrontBatch(jtCtx.leftRowsList, i); + return; + } + + continue; + } + + appendAsofLeftEachResGrps(leftRow, rightOffset, jtCtx.jLimit); + append = true; + break; + } + + if (!append) { + if (!forceOut) { + break; + } + + appendAsofLeftNonMatchGrp(leftRow); + } + } + + taosArrayPopFrontBatch(jtCtx.rightRowsList, rightOffset); + taosArrayPopFrontBatch(jtCtx.rightFilterOut, rightOffset); + taosArrayPopFrontBatch(jtCtx.leftRowsList, i); } void trimForAsofJlimit() { @@ -933,15 +1015,13 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { keepInput = true; if (blkId == LEFT_BLK_ID) { if (NULL == jtCtx.leftRowsList) { - jtCtx.leftRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.resColSize); - jtCtx.leftRowsNum = 0; + jtCtx.leftRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.inColSize); } pTableRows = jtCtx.leftRowsList; } else { if (NULL == jtCtx.rightRowsList) { - jtCtx.rightRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.resColSize); + jtCtx.rightRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.inColSize); jtCtx.rightFilterOut = taosArrayInit(jtCtx.jLimit, sizeof(bool)); - jtCtx.rightRowsNum = 0; } pTableRows = jtCtx.rightRowsList; } @@ -1016,12 +1096,12 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet((*ppBlk)->pDataBlock, c); colDataSetVal(pCol, (*ppBlk)->info.rows, pData, isNull); - if (keepInput && jtCtx.resColList[tableOffset + c]) { - if (!filterOut || (filterOut && blkId != LEFT_BLK_ID)) { + if (keepInput) { + if (!filterOut || (blkId != LEFT_BLK_ID)) { if (isNull) { - *(char*)(jtCtx.resColBuf + tableOffset + c) = true; + *(char*)(jtCtx.inColBuf + c) = true; } else { - memcpy(jtCtx.resColBuf + jtCtx.resColOffset[tableOffset + c], pData, tDataTypes[jtInputColType[c]].bytes); + memcpy(jtCtx.inColBuf + jtCtx.inColOffset[c], pData, tDataTypes[jtInputColType[c]].bytes); } addToRowList = true; } @@ -1035,7 +1115,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { } if (keepInput && addToRowList) { - taosArrayPush(pTableRows, jtCtx.resColBuf); + taosArrayPush(pTableRows, jtCtx.inColBuf); bool fout = filterOut ? true : false; taosArrayPush(jtCtx.rightFilterOut, &fout); } @@ -1061,7 +1141,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { trimForAsofJlimit(); } } else { - checkAppendAsofResRows(); + chkAppendAsofGreaterResRows(); } } @@ -1666,6 +1746,99 @@ void antiJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { } +void addAsofEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) { + bool filterOut = false; + void* cvalue = NULL, *filterValue = NULL; + int64_t cbig = 0, fbig = 0; + int32_t filterNum = leftTable ? jtCtx.leftFilterNum : jtCtx.rightFilterNum; + int32_t* filterCol = leftTable ? jtCtx.leftFilterColList : jtCtx.rightFilterColList; + SArray* rowList = leftTable ? jtCtx.leftRowsList : jtCtx.rightRowsList; + + if (!leftTable) { + rowsNum = TMIN(rowsNum, jtCtx.jLimit); + } + + for (int32_t l = 0; l < rowsNum; ++l) { + char* row = jtCtx.colRowDataBuf + tbOffset + jtCtx.blkRowSize * l; + + filterOut = false; + + for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { + cvalue = row + jtCtx.colRowOffset[c]; + switch (jtInputColType[c]) { + case TSDB_DATA_TYPE_TIMESTAMP: + fbig = TIMESTAMP_FILTER_VALUE; + cbig = *(int64_t*)cvalue; + break; + case TSDB_DATA_TYPE_INT: + fbig = INT_FILTER_VALUE; + cbig = *(int32_t*)cvalue; + break; + case TSDB_DATA_TYPE_BIGINT: + fbig = BIGINT_FILTER_VALUE; + cbig = *(int64_t*)cvalue; + break; + default: + break; + } + + if (filterNum && filterCol[c] && ((*(bool*)(row + c)) || cbig <= fbig)) { + filterOut = true; + break; + } + } + + if (filterOut && leftTable) { + continue; + } + + taosArrayPush(rowList, row); + if (!leftTable) { + taosArrayPush(jtCtx.rightFilterOut, &filterOut); + } + } + + if (!leftTable) { + trimForAsofJlimit(); + } +} + +void asofJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { + bool filterOut = false; + void* lValue = NULL, *rValue = NULL, *filterValue = NULL; + int64_t lBig = 0, rBig = 0, fbig = 0; + int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows; + + switch (jtCtx.asofOpType) { + case OP_TYPE_GREATER_THAN: + addAsofEqInRows(leftGrpRows, 0, true); + appendAllAsofResRows(); + addAsofEqInRows(rightGrpRows, rightTbOffset, false); + break; + case OP_TYPE_GREATER_EQUAL: + addAsofEqInRows(leftGrpRows, 0, true); + addAsofEqInRows(rightGrpRows, rightTbOffset, false); + appendAllAsofResRows(); + break; + case OP_TYPE_LOWER_THAN: + case OP_TYPE_LOWER_EQUAL: + addAsofEqInRows(leftGrpRows, 0, true); + addAsofEqInRows(rightGrpRows, rightTbOffset, false); + chkAppendAsofGreaterResRows(false); + break; + case OP_TYPE_EQUAL: + taosArrayClear(jtCtx.leftRowsList); + taosArrayClear(jtCtx.rightRowsList); + taosArrayClear(jtCtx.rightFilterOut); + addAsofEqInRows(leftGrpRows, 0, true); + addAsofEqInRows(rightGrpRows, rightTbOffset, false); + chkAppendAsofGreaterResRows(true); + break; + default: + return; + } +} + void fullJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { bool leftMatch = false, rightMatch = false, lfilterOut = false, rfilterOut = false; @@ -1799,6 +1972,9 @@ void appendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) { case JOIN_STYPE_ANTI: antiJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); break; + case JOIN_STYPE_ASOF: + asofJoinAppendEqGrpRes(leftGrpRows, rightGrpRows); + break; default: break; } @@ -1899,6 +2075,11 @@ void createBothBlkRowsData(void) { break; } } + + if (JOIN_STYPE_ASOF == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) { + ASSERT(OP_TYPE_LOWER_EQUAL == jtCtx.asofOpType || OP_TYPE_LOWER_THAN == jtCtx.asofOpType); + chkAppendAsofGreaterResRows(true); + } } void createDummyBlkList(int32_t leftMaxRows, int32_t leftMaxGrpRows, int32_t rightMaxRows, int32_t rightMaxGrpRows, int32_t blkRows) { @@ -2288,6 +2469,14 @@ void initJoinTest() { jtStat.pHistory = taosArrayInit(100000, sizeof(SJoinTestHistory)); } + int32_t offset = MAX_SLOT_NUM * sizeof(bool); + for (int32_t i = 0; i < MAX_SLOT_NUM; ++i) { + jtCtx.inColOffset[i] = offset; + offset += tDataTypes[jtInputColType[i]].bytes; + } + jtCtx.inColSize = offset; + jtCtx.inColBuf = taosMemoryMalloc(jtCtx.inColSize); + jtInitLogFile(); }