enh: add asof ut framework

This commit is contained in:
dapan1121 2024-01-23 14:47:24 +08:00
parent 888343f6fa
commit 3630cbd18d
1 changed files with 228 additions and 39 deletions

View File

@ -79,6 +79,7 @@ enum {
#define JT_MAX_JLIMIT 3
#define JT_KEY_SOLT_ID (MAX_SLOT_NUM - 1)
#define JT_PRIM_TS_SLOT_ID 0
int32_t jtInputColType[MAX_SLOT_NUM] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_BIGINT};
char* jtColCondStr[] = {"", "NO COND", "EQ COND", "ON COND", "FULL COND"};
@ -169,9 +170,10 @@ typedef struct {
int32_t rightFinMatchNum;
bool* rightFinMatch;
int64_t leftRowsNum;
int32_t inColOffset[MAX_SLOT_NUM];
int32_t inColSize;
char* inColBuf;
SArray* leftRowsList;
int64_t rightRowsNum;
SArray* rightRowsList;
SArray* rightFilterOut;
} SJoinTestCtx;
@ -843,50 +845,130 @@ SSDataBlock* createDummyBlock(int32_t blkId) {
return p;
}
void appendAsofLeftEachResGrps(char* leftInRow, int32_t rightOffset, int32_t rightRows) {
memset(jtCtx.resColBuf, 0, jtCtx.resColSize);
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
if (!jtCtx.resColList[c]) {
continue;
}
if (*((bool*)leftInRow + c)) {
*(char*)(jtCtx.resColBuf + c) = true;
} else {
memcpy(jtCtx.resColBuf + jtCtx.resColOffset[c], leftInRow + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes);
}
}
for (int32_t r = rightOffset; r < rightRows; ++r) {
bool* rightFilterOut = taosArrayGet(jtCtx.rightFilterOut, r);
if (*rightFilterOut) {
continue;
}
char* rightResRows = taosArrayGet(jtCtx.rightRowsList, r);
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
if (jtCtx.resColList[MAX_SLOT_NUM + c]) {
if (*(bool*)(rightResRows + MAX_SLOT_NUM + c)) {
*(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = true;
} else {
*(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = false;
memcpy(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], rightResRows + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes);
}
}
}
pushResRow(jtCtx.resColBuf, jtCtx.resColSize);
}
}
void appendAsofLeftNonMatchGrp(char* leftInRow) {
memset(jtCtx.resColBuf, 0, jtCtx.resColSize);
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
if (!jtCtx.resColList[c]) {
continue;
}
if (*((bool*)leftInRow + c)) {
*(char*)(jtCtx.resColBuf + c) = true;
} else {
memcpy(jtCtx.resColBuf + jtCtx.resColOffset[c], leftInRow + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes);
}
}
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
if (jtCtx.resColList[MAX_SLOT_NUM + c]) {
*(char*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = true;
}
}
pushResRow(jtCtx.resColBuf, jtCtx.resColSize);
}
void appendAllAsofResRows() {
int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList);
int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList);
if (rightRows <= 0) {
for (int32_t i = 0; i < leftRows; ++i) {
char* leftResRows = taosArrayGet(jtCtx.leftRowsList, i);
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
if (jtCtx.resColList[MAX_SLOT_NUM + c]) {
*(char*)(leftResRows + MAX_SLOT_NUM + c) = true;
}
}
pushResRow(leftResRows, jtCtx.resColSize);
char* leftInRow = taosArrayGet(jtCtx.leftRowsList, i);
appendAsofLeftNonMatchGrp(leftInRow);
}
} else {
ASSERT(rightRows <= jtCtx.jLimit);
for (int32_t i = 0; i < leftRows; ++i) {
char* leftResRows = taosArrayGet(jtCtx.leftRowsList, i);
for (int32_t r = 0; r < rightRows; ++r) {
char* rightResRows = taosArrayGet(jtCtx.rightRowsList, r);
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
if (jtCtx.resColList[MAX_SLOT_NUM + c]) {
if (*(bool*)(rightResRows + MAX_SLOT_NUM + c)) {
*(bool*)(leftResRows + MAX_SLOT_NUM + c) = true;
} else {
*(bool*)(leftResRows + MAX_SLOT_NUM + c) = false;
memcpy(leftResRows + jtCtx.resColOffset[MAX_SLOT_NUM + c], rightResRows + jtCtx.resColOffset[MAX_SLOT_NUM + c], tDataTypes[jtInputColType[c]].bytes);
}
}
}
pushResRow(leftResRows, jtCtx.resColSize);
}
char* leftInRow = taosArrayGet(jtCtx.leftRowsList, i);
appendAsofLeftEachResGrps(leftInRow, 0, rightRows);
}
}
taosArrayClear(jtCtx.leftRowsList);
}
void checkAppendAsofResRows(bool forceOut) {
void chkAppendAsofGreaterResRows(bool forceOut) {
int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList);
if (rightRows <= 0 && !forceOut) {
if (rightRows < jtCtx.jLimit && !forceOut) {
return;
}
int32_t rightRemains = rightRows;
int32_t rightOffset = 0;
int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList);
int32_t i = 0;
for (; i < leftRows; ++i) {
char* leftRow = taosArrayGet(jtCtx.leftRowsList, i);
int64_t* leftTs = (int64_t*)(leftRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]);
bool append = false;
for (int32_t r = rightOffset; r < rightRows; ++r) {
char* rightRow = taosArrayGet(jtCtx.rightRowsList, r);
int64_t* rightTs = (int64_t*)(rightRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]);
if ((*leftTs > *rightTs) || (*leftTs == *rightTs && OP_TYPE_LOWER_THAN == jtCtx.asofOpType)) {
rightOffset++;
rightRemains--;
if (rightRemains < jtCtx.jLimit && !forceOut) {
taosArrayPopFrontBatch(jtCtx.rightRowsList, rightOffset);
taosArrayPopFrontBatch(jtCtx.rightFilterOut, rightOffset);
taosArrayPopFrontBatch(jtCtx.leftRowsList, i);
return;
}
continue;
}
appendAsofLeftEachResGrps(leftRow, rightOffset, jtCtx.jLimit);
append = true;
break;
}
if (!append) {
if (!forceOut) {
break;
}
appendAsofLeftNonMatchGrp(leftRow);
}
}
taosArrayPopFrontBatch(jtCtx.rightRowsList, rightOffset);
taosArrayPopFrontBatch(jtCtx.rightFilterOut, rightOffset);
taosArrayPopFrontBatch(jtCtx.leftRowsList, i);
}
void trimForAsofJlimit() {
@ -933,15 +1015,13 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
keepInput = true;
if (blkId == LEFT_BLK_ID) {
if (NULL == jtCtx.leftRowsList) {
jtCtx.leftRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.resColSize);
jtCtx.leftRowsNum = 0;
jtCtx.leftRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.inColSize);
}
pTableRows = jtCtx.leftRowsList;
} else {
if (NULL == jtCtx.rightRowsList) {
jtCtx.rightRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.resColSize);
jtCtx.rightRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.inColSize);
jtCtx.rightFilterOut = taosArrayInit(jtCtx.jLimit, sizeof(bool));
jtCtx.rightRowsNum = 0;
}
pTableRows = jtCtx.rightRowsList;
}
@ -1016,12 +1096,12 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet((*ppBlk)->pDataBlock, c);
colDataSetVal(pCol, (*ppBlk)->info.rows, pData, isNull);
if (keepInput && jtCtx.resColList[tableOffset + c]) {
if (!filterOut || (filterOut && blkId != LEFT_BLK_ID)) {
if (keepInput) {
if (!filterOut || (blkId != LEFT_BLK_ID)) {
if (isNull) {
*(char*)(jtCtx.resColBuf + tableOffset + c) = true;
*(char*)(jtCtx.inColBuf + c) = true;
} else {
memcpy(jtCtx.resColBuf + jtCtx.resColOffset[tableOffset + c], pData, tDataTypes[jtInputColType[c]].bytes);
memcpy(jtCtx.inColBuf + jtCtx.inColOffset[c], pData, tDataTypes[jtInputColType[c]].bytes);
}
addToRowList = true;
}
@ -1035,7 +1115,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
}
if (keepInput && addToRowList) {
taosArrayPush(pTableRows, jtCtx.resColBuf);
taosArrayPush(pTableRows, jtCtx.inColBuf);
bool fout = filterOut ? true : false;
taosArrayPush(jtCtx.rightFilterOut, &fout);
}
@ -1061,7 +1141,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
trimForAsofJlimit();
}
} else {
checkAppendAsofResRows();
chkAppendAsofGreaterResRows();
}
}
@ -1666,6 +1746,99 @@ void antiJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
}
void addAsofEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) {
bool filterOut = false;
void* cvalue = NULL, *filterValue = NULL;
int64_t cbig = 0, fbig = 0;
int32_t filterNum = leftTable ? jtCtx.leftFilterNum : jtCtx.rightFilterNum;
int32_t* filterCol = leftTable ? jtCtx.leftFilterColList : jtCtx.rightFilterColList;
SArray* rowList = leftTable ? jtCtx.leftRowsList : jtCtx.rightRowsList;
if (!leftTable) {
rowsNum = TMIN(rowsNum, jtCtx.jLimit);
}
for (int32_t l = 0; l < rowsNum; ++l) {
char* row = jtCtx.colRowDataBuf + tbOffset + jtCtx.blkRowSize * l;
filterOut = false;
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
cvalue = row + jtCtx.colRowOffset[c];
switch (jtInputColType[c]) {
case TSDB_DATA_TYPE_TIMESTAMP:
fbig = TIMESTAMP_FILTER_VALUE;
cbig = *(int64_t*)cvalue;
break;
case TSDB_DATA_TYPE_INT:
fbig = INT_FILTER_VALUE;
cbig = *(int32_t*)cvalue;
break;
case TSDB_DATA_TYPE_BIGINT:
fbig = BIGINT_FILTER_VALUE;
cbig = *(int64_t*)cvalue;
break;
default:
break;
}
if (filterNum && filterCol[c] && ((*(bool*)(row + c)) || cbig <= fbig)) {
filterOut = true;
break;
}
}
if (filterOut && leftTable) {
continue;
}
taosArrayPush(rowList, row);
if (!leftTable) {
taosArrayPush(jtCtx.rightFilterOut, &filterOut);
}
}
if (!leftTable) {
trimForAsofJlimit();
}
}
void asofJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
bool filterOut = false;
void* lValue = NULL, *rValue = NULL, *filterValue = NULL;
int64_t lBig = 0, rBig = 0, fbig = 0;
int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows;
switch (jtCtx.asofOpType) {
case OP_TYPE_GREATER_THAN:
addAsofEqInRows(leftGrpRows, 0, true);
appendAllAsofResRows();
addAsofEqInRows(rightGrpRows, rightTbOffset, false);
break;
case OP_TYPE_GREATER_EQUAL:
addAsofEqInRows(leftGrpRows, 0, true);
addAsofEqInRows(rightGrpRows, rightTbOffset, false);
appendAllAsofResRows();
break;
case OP_TYPE_LOWER_THAN:
case OP_TYPE_LOWER_EQUAL:
addAsofEqInRows(leftGrpRows, 0, true);
addAsofEqInRows(rightGrpRows, rightTbOffset, false);
chkAppendAsofGreaterResRows(false);
break;
case OP_TYPE_EQUAL:
taosArrayClear(jtCtx.leftRowsList);
taosArrayClear(jtCtx.rightRowsList);
taosArrayClear(jtCtx.rightFilterOut);
addAsofEqInRows(leftGrpRows, 0, true);
addAsofEqInRows(rightGrpRows, rightTbOffset, false);
chkAppendAsofGreaterResRows(true);
break;
default:
return;
}
}
void fullJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
bool leftMatch = false, rightMatch = false, lfilterOut = false, rfilterOut = false;
@ -1799,6 +1972,9 @@ void appendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
case JOIN_STYPE_ANTI:
antiJoinAppendEqGrpRes(leftGrpRows, rightGrpRows);
break;
case JOIN_STYPE_ASOF:
asofJoinAppendEqGrpRes(leftGrpRows, rightGrpRows);
break;
default:
break;
}
@ -1899,6 +2075,11 @@ void createBothBlkRowsData(void) {
break;
}
}
if (JOIN_STYPE_ASOF == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) {
ASSERT(OP_TYPE_LOWER_EQUAL == jtCtx.asofOpType || OP_TYPE_LOWER_THAN == jtCtx.asofOpType);
chkAppendAsofGreaterResRows(true);
}
}
void createDummyBlkList(int32_t leftMaxRows, int32_t leftMaxGrpRows, int32_t rightMaxRows, int32_t rightMaxGrpRows, int32_t blkRows) {
@ -2288,6 +2469,14 @@ void initJoinTest() {
jtStat.pHistory = taosArrayInit(100000, sizeof(SJoinTestHistory));
}
int32_t offset = MAX_SLOT_NUM * sizeof(bool);
for (int32_t i = 0; i < MAX_SLOT_NUM; ++i) {
jtCtx.inColOffset[i] = offset;
offset += tDataTypes[jtInputColType[i]].bytes;
}
jtCtx.inColSize = offset;
jtCtx.inColBuf = taosMemoryMalloc(jtCtx.inColSize);
jtInitLogFile();
}