enh: add asof ut

This commit is contained in:
dapan1121 2024-01-22 19:24:31 +08:00
parent 3fecb387cc
commit 888343f6fa
3 changed files with 131 additions and 12 deletions

View File

@ -20,7 +20,7 @@ extern "C" {
#endif
#if 1
#define MJOIN_DEFAULT_BLK_ROWS_NUM 10 //4096
#define MJOIN_DEFAULT_BLK_ROWS_NUM 2 //4096
#define MJOIN_HJOIN_CART_THRESHOLD 10
#define MJOIN_BLK_SIZE_LIMIT 0 //10485760
#define MJOIN_ROW_BITMAP_SIZE (2 * 1048576)

View File

@ -1837,7 +1837,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow
}
int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
if (pCtx->cache.outBlk->info.rows <= 0) {
if (NULL == pCtx->cache.outBlk || pCtx->cache.outBlk->info.rows <= 0) {
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true);
}

View File

@ -113,6 +113,7 @@ typedef struct {
int32_t colCond;
int32_t joinType;
int32_t subType;
int32_t asofOpType;
int64_t jLimit;
int32_t leftTotalRows;
@ -167,6 +168,12 @@ typedef struct {
int32_t rightFinMatchNum;
bool* rightFinMatch;
int64_t leftRowsNum;
SArray* leftRowsList;
int64_t rightRowsNum;
SArray* rightRowsList;
SArray* rightFilterOut;
} SJoinTestCtx;
typedef struct {
@ -795,6 +802,7 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param
jtCtx.subType = param->subType;
jtCtx.asc = param->asc;
jtCtx.jLimit = param->jLimit;
jtCtx.asofOpType = param->asofOp;
jtCtx.leftColOnly = (JOIN_TYPE_LEFT == param->joinType && JOIN_STYPE_SEMI == param->subType);
jtCtx.rightColOnly = (JOIN_TYPE_RIGHT == param->joinType && JOIN_STYPE_SEMI == param->subType);
@ -835,6 +843,62 @@ SSDataBlock* createDummyBlock(int32_t blkId) {
return p;
}
void appendAllAsofResRows() {
int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList);
int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList);
if (rightRows <= 0) {
for (int32_t i = 0; i < leftRows; ++i) {
char* leftResRows = taosArrayGet(jtCtx.leftRowsList, i);
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
if (jtCtx.resColList[MAX_SLOT_NUM + c]) {
*(char*)(leftResRows + MAX_SLOT_NUM + c) = true;
}
}
pushResRow(leftResRows, jtCtx.resColSize);
}
} else {
ASSERT(rightRows <= jtCtx.jLimit);
for (int32_t i = 0; i < leftRows; ++i) {
char* leftResRows = taosArrayGet(jtCtx.leftRowsList, i);
for (int32_t r = 0; r < rightRows; ++r) {
char* rightResRows = taosArrayGet(jtCtx.rightRowsList, r);
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
if (jtCtx.resColList[MAX_SLOT_NUM + c]) {
if (*(bool*)(rightResRows + MAX_SLOT_NUM + c)) {
*(bool*)(leftResRows + MAX_SLOT_NUM + c) = true;
} else {
*(bool*)(leftResRows + MAX_SLOT_NUM + c) = false;
memcpy(leftResRows + jtCtx.resColOffset[MAX_SLOT_NUM + c], rightResRows + jtCtx.resColOffset[MAX_SLOT_NUM + c], tDataTypes[jtInputColType[c]].bytes);
}
}
}
pushResRow(leftResRows, jtCtx.resColSize);
}
}
}
}
void checkAppendAsofResRows(bool forceOut) {
int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList);
if (rightRows <= 0 && !forceOut) {
return;
}
}
void trimForAsofJlimit() {
int32_t rowNum = taosArrayGetSize(jtCtx.rightRowsList);
if (rowNum <= jtCtx.jLimit) {
return;
}
taosArrayPopFrontBatch(jtCtx.rightRowsList, rowNum - jtCtx.jLimit);
taosArrayPopFrontBatch(jtCtx.rightFilterOut, rowNum - jtCtx.jLimit);
}
void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
if (grpRows <= 0) {
return;
@ -848,9 +912,11 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
jtCtx.inputStat |= (1 << blkId);
SArray* pTableRows = NULL;
int32_t tableOffset = 0;
int32_t peerOffset = 0;
bool keepRes = false;
bool keepInput = false;
if (blkId == LEFT_BLK_ID) {
if ((jtCtx.joinType == JOIN_TYPE_LEFT || jtCtx.joinType == JOIN_TYPE_FULL) && jtCtx.subType != JOIN_STYPE_SEMI) {
keepRes = true;
@ -862,6 +928,24 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
}
tableOffset = MAX_SLOT_NUM;
}
if (JOIN_STYPE_ASOF == jtCtx.subType && jtCtx.asofOpType != OP_TYPE_EQUAL) {
keepInput = true;
if (blkId == LEFT_BLK_ID) {
if (NULL == jtCtx.leftRowsList) {
jtCtx.leftRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.resColSize);
jtCtx.leftRowsNum = 0;
}
pTableRows = jtCtx.leftRowsList;
} else {
if (NULL == jtCtx.rightRowsList) {
jtCtx.rightRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.resColSize);
jtCtx.rightFilterOut = taosArrayInit(jtCtx.jLimit, sizeof(bool));
jtCtx.rightRowsNum = 0;
}
pTableRows = jtCtx.rightRowsList;
}
}
int32_t filterNum = (blkId == LEFT_BLK_ID) ? jtCtx.leftFilterNum : jtCtx.rightFilterNum;
int32_t peerFilterNum = (blkId == LEFT_BLK_ID) ? jtCtx.rightFilterNum : jtCtx.leftFilterNum;
@ -872,6 +956,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
int64_t tmpBigint = 0;
bool isNull = false;
bool filterOut = false;
bool addToRowList = false;
int32_t vRange = TMAX(grpRows / 3, 3);
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
jtCtx.grpOffset[c] = c * TMAX(100, grpRows);
@ -888,6 +973,8 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
if (!filterOut) {
memset(jtCtx.resColBuf, 0, jtCtx.resColSize);
}
addToRowList = false;
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
switch (jtInputColType[c]) {
@ -929,7 +1016,16 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet((*ppBlk)->pDataBlock, c);
colDataSetVal(pCol, (*ppBlk)->info.rows, pData, isNull);
if (keepRes && !filterOut && jtCtx.resColList[tableOffset + c]) {
if (keepInput && jtCtx.resColList[tableOffset + c]) {
if (!filterOut || (filterOut && blkId != LEFT_BLK_ID)) {
if (isNull) {
*(char*)(jtCtx.resColBuf + tableOffset + c) = true;
} else {
memcpy(jtCtx.resColBuf + jtCtx.resColOffset[tableOffset + c], pData, tDataTypes[jtInputColType[c]].bytes);
}
addToRowList = true;
}
} else if (keepRes && !filterOut && jtCtx.resColList[tableOffset + c]) {
if (isNull) {
*(char*)(jtCtx.resColBuf + tableOffset + c) = true;
} else {
@ -938,6 +1034,12 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
}
}
if (keepInput && addToRowList) {
taosArrayPush(pTableRows, jtCtx.resColBuf);
bool fout = filterOut ? true : false;
taosArrayPush(jtCtx.rightFilterOut, &fout);
}
if (keepRes && !filterOut) {
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
if (jtCtx.resColList[peerOffset + c]) {
@ -950,6 +1052,19 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
(*ppBlk)->info.rows++;
}
if (keepInput) {
if (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN) {
if (blkId == LEFT_BLK_ID) {
appendAllAsofResRows();
} else {
trimForAsofJlimit();
}
} else {
checkAppendAsofResRows();
}
}
}
void createRowData(SSDataBlock* pBlk, int64_t tbOffset, int32_t rowIdx, int32_t vRange) {
@ -2165,7 +2280,6 @@ void jtInitLogFile() {
void initJoinTest() {
jtCtx.leftBlkList = taosArrayInit(10, POINTER_BYTES);
jtCtx.rightBlkList = taosArrayInit(10, POINTER_BYTES);
jtCtx.jtResRows = tSimpleHashInit(10000000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
joinTestReplaceRetrieveFp();
@ -2203,13 +2317,18 @@ void handleTestDone() {
jtCtx.resRows = 0;
jtCtx.inputStat = 0;
taosArrayDestroy(jtCtx.leftRowsList);
taosArrayDestroy(jtCtx.rightRowsList);
jtCtx.leftRowsList = NULL;
jtCtx.rightRowsList = NULL;
}
void runSingleTest(char* caseName, SJoinTestParam* param) {
bool contLoop = true;
SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param);
createDummyBlkList(200, 200, 200, 200, 20);
createDummyBlkList(10, 10, 10, 10, 3);
while (contLoop) {
rerunBlockedHere();
@ -2241,7 +2360,7 @@ void handleCaseEnd() {
} // namespace
#if 1
#if 0
#if 1
TEST(innerJoin, noCondTest) {
SJoinTestParam param;
@ -2344,7 +2463,7 @@ TEST(innerJoin, fullCondTest) {
#endif
#if 1
#if 0
#if 1
TEST(leftOuterJoin, noCondTest) {
SJoinTestParam param;
@ -2446,7 +2565,7 @@ TEST(leftOuterJoin, fullCondTest) {
#endif
#endif
#if 1
#if 0
#if 1
TEST(fullOuterJoin, noCondTest) {
SJoinTestParam param;
@ -2549,7 +2668,7 @@ TEST(fullOuterJoin, fullCondTest) {
#endif
#endif
#if 1
#if 0
#if 1
TEST(leftSemiJoin, noCondTest) {
SJoinTestParam param;
@ -2652,7 +2771,7 @@ TEST(leftSemiJoin, fullCondTest) {
#endif
#endif
#if 1
#if 0
#if 1
TEST(leftAntiJoin, noCondTest) {
SJoinTestParam param;
@ -2759,7 +2878,7 @@ TEST(leftAntiJoin, fullCondTest) {
#if 1
TEST(leftAsofJoin, noCondGreaterThanTest) {
SJoinTestParam param;
char* caseName = "leftAntiJoin:noCondGreaterThanTest";
char* caseName = "leftAsofJoin:noCondGreaterThanTest";
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
param.pTask = pTask;
@ -2787,7 +2906,7 @@ TEST(leftAsofJoin, noCondGreaterThanTest) {
#if 1
TEST(leftAsofJoin, eqCondTest) {
SJoinTestParam param;
char* caseName = "leftAntiJoin:eqCondTest";
char* caseName = "leftAsofJoin:eqCondTest";
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
param.pTask = pTask;