enh: fix asof bugs

This commit is contained in:
dapan1121 2024-01-29 16:58:08 +08:00
parent 6640809638
commit 32b16bd6e7
3 changed files with 269 additions and 106 deletions

View File

@ -197,14 +197,6 @@ typedef enum {
E_CACHE_INBLK E_CACHE_INBLK
} SMJoinCacheMode; } SMJoinCacheMode;
typedef struct SAsofJoinGrpRows {
SSDataBlock* blk;
bool clonedBlk;
int32_t blkRowIdx;
int32_t readIdx;
} SAsofJoinGrpRows;
typedef struct SMJoinWinCache { typedef struct SMJoinWinCache {
int32_t pageLimit; int32_t pageLimit;
int32_t outRowIdx; int32_t outRowIdx;
@ -228,9 +220,9 @@ typedef struct SMJoinWindowCtx {
// KEEP IT FIRST // KEEP IT FIRST
int32_t asofOpType; int32_t asofOpType;
bool asofLowerRow; bool lowerRowsAcq;
bool asofEqRow; bool eqRowsAcq;
bool asofGreaterRow; bool greaterRowsAcq;
bool eqPostDone; bool eqPostDone;
int64_t lastTs; int64_t lastTs;
@ -335,11 +327,12 @@ typedef struct SMJoinOperatorInfo {
#define MJOIN_SAVE_TB_BLK(_cache, _tb) \ #define MJOIN_SAVE_TB_BLK(_cache, _tb) \
do { \ do { \
ASSERT(taosArrayGetSize((_cache)->grps) >= 1); \
SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \ SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \
ASSERT(pGrp->blk == (_tb)->blk); \ if (NULL != pGrp) { \
pGrp->beginIdx = (_tb)->blkRowIdx; \ ASSERT(pGrp->blk == (_tb)->blk); \
pGrp->readIdx = pGrp->beginIdx; \ pGrp->beginIdx = (_tb)->blkRowIdx; \
pGrp->readIdx = pGrp->beginIdx; \
} \
} while (0) } while (0)
#define MJOIN_POP_TB_BLK(_cache) \ #define MJOIN_POP_TB_BLK(_cache) \

View File

@ -1784,9 +1784,11 @@ int32_t mAsofLowerAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, boo
int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) { int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) {
int64_t eqRowsNum = 0; int64_t eqRowsNum = 0;
SMJoinGrpRows grp = {.blk = pTable->blk}; SMJoinGrpRows grp;
do { do {
grp.blk = pTable->blk;
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
@ -1813,7 +1815,8 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow
} }
if (eqRowsNum < pCtx->jLimit) { if (eqRowsNum < pCtx->jLimit) {
MJ_ERR_RET(mAsofLowerAddRowsToCache(pCtx, &grp, false)); grp.endIdx = grp.beginIdx + TMIN(grp.endIdx - grp.beginIdx + 1, pCtx->jLimit - eqRowsNum) - 1;
MJ_ERR_RET(mAsofLowerAddRowsToCache(pCtx, &grp, true));
} }
eqRowsNum += grp.endIdx - grp.beginIdx + 1; eqRowsNum += grp.endIdx - grp.beginIdx + 1;
@ -1823,6 +1826,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow
qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
pTable->blkRowIdx = 0; pTable->blkRowIdx = 0;
pCtx->buildGrp.blk = pTable->blk;
if (NULL == pTable->blk) { if (NULL == pTable->blk) {
pTable->dsFetchDone = true; pTable->dsFetchDone = true;
@ -1893,6 +1897,8 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp)); MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
rowsLeft -= grpRemainRows; rowsLeft -= grpRemainRows;
pCtx->cache.outRowIdx = 0; pCtx->cache.outRowIdx = 0;
probeGrp->readIdx++;
probeGrp->endIdx = probeEndIdx;
continue; continue;
} }
@ -1908,40 +1914,45 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, bool lastBuildGrp) { int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, bool lastBuildGrp, bool skipEqPost) {
if (!pCtx->asofEqRow) { if (!pCtx->eqRowsAcq) {
MJ_ERR_RET(mAsofLowerDumpGrpCache(pCtx)); MJ_ERR_RET(mAsofLowerDumpGrpCache(pCtx));
pCtx->lastEqGrp = true;
if (pCtx->grpRemains) { if (pCtx->grpRemains) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!pCtx->eqPostDone && !lastBuildGrp) {
pCtx->eqPostDone = true;
return mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs);
}
return TSDB_CODE_SUCCESS;
} }
if (!pCtx->eqPostDone && !lastBuildGrp) { if (!pCtx->eqPostDone && !lastBuildGrp && (pCtx->eqRowsAcq || !skipEqPost)) {
pCtx->eqPostDone = true; pCtx->eqPostDone = true;
MJ_ERR_RET(mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs)); MJ_ERR_RET(mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs));
} }
return mAsofLowerDumpGrpCache(pCtx); if (!pCtx->eqRowsAcq) {
return TSDB_CODE_SUCCESS;
}
MJ_ERR_RET(mAsofLowerDumpGrpCache(pCtx));
pCtx->lastEqGrp = true;
return TSDB_CODE_SUCCESS;
} }
int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
SMJoinOperatorInfo* pJoin = pCtx->pJoin; SMJoinOperatorInfo* pJoin = pCtx->pJoin;
pCtx->lastEqGrp = true;
if (!lastBuildGrp) { if (!lastBuildGrp) {
pCtx->eqPostDone = false; pCtx->eqPostDone = false;
} }
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp)); bool wholeBlk = false;
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &wholeBlk, &pCtx->probeGrp));
return mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp); MJ_ERR_RET(mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp, wholeBlk));
return TSDB_CODE_SUCCESS;
} }
@ -1990,7 +2001,7 @@ int32_t mAsofLowerProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p
} }
int32_t mAsofLowerHandleGrpRemains(SMJoinWindowCtx* pCtx) { int32_t mAsofLowerHandleGrpRemains(SMJoinWindowCtx* pCtx) {
return (pCtx->lastEqGrp) ? mAsofLowerDumpUpdateEqRows(pCtx, pCtx->pJoin, false) : mAsofLowerDumpGrpCache(pCtx); return (pCtx->lastEqGrp) ? mAsofLowerDumpUpdateEqRows(pCtx, pCtx->pJoin, false, true) : mAsofLowerDumpGrpCache(pCtx);
} }
static bool mAsofLowerRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { static bool mAsofLowerRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
@ -2062,6 +2073,12 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) {
} }
} }
if (pCtx->lastEqGrp && !pCtx->eqPostDone) {
pCtx->eqPostDone = true;
MJ_ERR_JRET(mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs));
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
}
while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
if (probeTs == buildTs) { if (probeTs == buildTs) {
pCtx->lastTs = probeTs; pCtx->lastTs = probeTs;
@ -2092,6 +2109,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) {
pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1; pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
MJ_ERR_JRET(mAsofLowerDumpGrpCache(pCtx)); MJ_ERR_JRET(mAsofLowerDumpGrpCache(pCtx));
pCtx->lastEqGrp = false;
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
@ -2208,8 +2226,13 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) {
} }
//ASSERT((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum); //ASSERT((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum);
} else {
ASSERT(grpNum == 1);
} }
ASSERT(taosArrayGetSize(pCache->grps) == 1);
ASSERT(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum);
do { do {
build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, build->downStreamIdx); build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, build->downStreamIdx);
qDebug("%s merge join %s table got block to fill grp, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0); qDebug("%s merge join %s table got block to fill grp, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
@ -2227,6 +2250,7 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) {
} }
MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, build->blk, 0, build->blk->info.rows)); MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, build->blk, 0, build->blk->info.rows));
pCache->rowNum += build->blk->info.rows;
//pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx; //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
} while (pCache->rowNum < pCtx->jLimit); } while (pCache->rowNum < pCtx->jLimit);
@ -2259,17 +2283,16 @@ void mAsofGreaterUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) {
int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) { int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) {
if (!lastBuildGrp) { if (!lastBuildGrp) {
MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
MJ_ERR_RET(mAsofGreaterChkFillGrpCache(pCtx)); MJ_ERR_RET(mAsofGreaterChkFillGrpCache(pCtx));
} }
MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
mAsofGreaterUpdateBuildGrpEndIdx(pCtx); mAsofGreaterUpdateBuildGrpEndIdx(pCtx);
return mAsofGreaterDumpGrpCache(pCtx); return mAsofGreaterDumpGrpCache(pCtx);
} }
int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) { int32_t mAsofGreaterSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
@ -2278,17 +2301,22 @@ int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool*
} }
pTable->blkRowIdx++; pTable->blkRowIdx++;
pCtx->cache.rowNum--;
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1); char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
if (timestamp != *(int64_t*)pEndVal) { if (timestamp != *(int64_t*)pEndVal) {
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx); char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
if (timestamp == *(int64_t*)pNextVal) { if (timestamp == *(int64_t*)pNextVal) {
pCtx->cache.rowNum--;
continue; continue;
} }
*wholeBlk = false; *wholeBlk = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else {
pCtx->cache.rowNum -= (pTable->blk->info.rows - pTable->blkRowIdx);
} }
*wholeBlk = true; *wholeBlk = true;
@ -2303,7 +2331,7 @@ int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) {
do { do {
do { do {
MJ_ERR_RET(mAsofGreaterSkipEqRows(pTable, timestamp, &wholeBlk)); MJ_ERR_RET(mAsofGreaterSkipEqRows(pCtx, pTable, timestamp, &wholeBlk));
if (!wholeBlk) { if (!wholeBlk) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2312,6 +2340,9 @@ int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) {
MJOIN_RESTORE_TB_BLK(cache, pTable); MJOIN_RESTORE_TB_BLK(cache, pTable);
} while (!MJOIN_BUILD_TB_ROWS_DONE(pTable)); } while (!MJOIN_BUILD_TB_ROWS_DONE(pTable));
ASSERT(pCtx->cache.rowNum == 0);
ASSERT(taosArrayGetSize(pCtx->cache.grps) == 0);
pTable->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pTable->downStreamIdx); pTable->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pTable->downStreamIdx);
qDebug("%s merge join %s table got block to skip eq ts, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); qDebug("%s merge join %s table got block to skip eq ts, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
@ -2330,7 +2361,7 @@ int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) {
int32_t mAsofGreaterUpdateDumpEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { int32_t mAsofGreaterUpdateDumpEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
if (!pCtx->asofEqRow && !lastBuildGrp) { if (!pCtx->eqRowsAcq && !lastBuildGrp) {
MJ_ERR_RET(mAsofGreaterSkipAllEqRows(pCtx, timestamp)); MJ_ERR_RET(mAsofGreaterSkipAllEqRows(pCtx, timestamp));
} }
@ -2368,16 +2399,16 @@ int32_t mAsofGreaterProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p
return mAsofGreaterFillDumpGrpCache(pCtx, false); return mAsofGreaterFillDumpGrpCache(pCtx, false);
} }
int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData** pCol, int64_t* probeTs, int64_t* buildTs) {
do { do {
MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build); MJOIN_GET_TB_CUR_TS(*pCol, *buildTs, pJoin->build);
if (!PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { if (!PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) {
break; break;
} }
pCtx->cache.rowNum--; pCtx->cache.rowNum--;
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build); MJOIN_GET_TB_CUR_TS(*pCol, *buildTs, pJoin->build);
if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) {
pCtx->cache.rowNum--; pCtx->cache.rowNum--;
continue; continue;
@ -2388,6 +2419,7 @@ int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo*
MJOIN_POP_TB_BLK(&pCtx->cache); MJOIN_POP_TB_BLK(&pCtx->cache);
MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build); MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build);
MJOIN_GET_TB_COL_TS(*pCol, *buildTs, pJoin->build);
} while (!MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)); } while (!MJOIN_BUILD_TB_ROWS_DONE(pJoin->build));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2398,7 +2430,10 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p
bool buildGot = false; bool buildGot = false;
do { do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { if ((probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) && pCtx->cache.rowNum < pCtx->jLimit) {
pJoin->build->newBlk = false;
MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
ASSERT(taosArrayGetSize(pCtx->cache.grps) <= 1);
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
} }
@ -2412,6 +2447,7 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
MJOIN_POP_TB_BLK(&pCtx->cache);
continue; continue;
} }
} }
@ -2419,7 +2455,7 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p
break; break;
} while (true); } while (true);
if (buildGot) { if (buildGot && pJoin->build->newBlk) {
if (NULL == pCtx->cache.outBlk) { if (NULL == pCtx->cache.outBlk) {
pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false);
blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit); blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit);
@ -2491,7 +2527,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) {
if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mAsofGreaterProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs)); MJ_ERR_JRET(mAsofGreaterProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs));
} else { } else {
MJ_ERR_JRET(mAsofGreaterProcessGreaterGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs)); MJ_ERR_JRET(mAsofGreaterProcessGreaterGrp(pCtx, pJoin, &pBuildCol, &probeTs, &buildTs));
} }
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
@ -2551,16 +2587,34 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
pCtx->pJoin = pJoin; pCtx->pJoin = pJoin;
pCtx->asofOpType = pJoinNode->asofOpType;
pCtx->asofEqRow = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType);
pCtx->asofLowerRow = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType);
pCtx->asofGreaterRow = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType);
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
if (pCtx->asofLowerRow) { switch (pJoinNode->subType) {
pJoin->joinFp = mAsofLowerJoinDo; case JOIN_STYPE_ASOF:
} else if (pCtx->asofGreaterRow) { pCtx->asofOpType = pJoinNode->asofOpType;
pJoin->joinFp = mAsofGreaterJoinDo; pCtx->eqRowsAcq = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType);
pCtx->lowerRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType);
pCtx->greaterRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType);
if (pCtx->lowerRowsAcq) {
pJoin->joinFp = mAsofLowerJoinDo;
} else if (pCtx->greaterRowsAcq) {
pJoin->joinFp = mAsofGreaterJoinDo;
}
break;
case JOIN_STYPE_WIN:
pCtx->eqRowsAcq = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType);
pCtx->lowerRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType);
pCtx->greaterRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType);
if (pCtx->lowerRowsAcq) {
pJoin->joinFp = mAsofLowerJoinDo;
} else if (pCtx->greaterRowsAcq) {
pJoin->joinFp = mAsofGreaterJoinDo;
}
break;
default:
break;
} }
if (pJoinNode->node.inputTsOrder != ORDER_DESC) { if (pJoinNode->node.inputTsOrder != ORDER_DESC) {

View File

@ -66,7 +66,7 @@ enum {
}; };
#define COL_DISPLAY_WIDTH 18 #define COL_DISPLAY_WIDTH 18
#define JT_MAX_LOOP 30000 #define JT_MAX_LOOP 5000
#define LEFT_BLK_ID 0 #define LEFT_BLK_ID 0
#define RIGHT_BLK_ID 1 #define RIGHT_BLK_ID 1
@ -859,17 +859,19 @@ void appendAsofLeftEachResGrps(char* leftInRow, int32_t rightOffset, int32_t rig
} }
} }
for (int32_t r = rightOffset; r < rightRows; ++r) { int32_t endIdx = TMIN(rightRows, taosArrayGetSize(jtCtx.rightRowsList) - rightOffset) + rightOffset;
bool* rightFilterOut = taosArrayGet(jtCtx.rightFilterOut, r); for (int32_t r = rightOffset; r < endIdx; ++r) {
bool* rightFilterOut = (bool*)taosArrayGet(jtCtx.rightFilterOut, r);
if (*rightFilterOut) { if (*rightFilterOut) {
continue; continue;
} }
char* rightResRows = taosArrayGet(jtCtx.rightRowsList, r); char* rightResRows = (char*)taosArrayGet(jtCtx.rightRowsList, r);
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
if (jtCtx.resColList[MAX_SLOT_NUM + c]) { if (jtCtx.resColList[MAX_SLOT_NUM + c]) {
if (*(bool*)(rightResRows + MAX_SLOT_NUM + c)) { if (*(bool*)(rightResRows + c)) {
*(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = true; *(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = true;
memset(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], 0, tDataTypes[jtInputColType[c]].bytes);
} else { } else {
*(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = false; *(bool*)(jtCtx.resColBuf + MAX_SLOT_NUM + c) = false;
memcpy(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], rightResRows + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes); memcpy(jtCtx.resColBuf + jtCtx.resColOffset[MAX_SLOT_NUM + c], rightResRows + jtCtx.inColOffset[c], tDataTypes[jtInputColType[c]].bytes);
@ -908,14 +910,16 @@ void appendAllAsofResRows() {
int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList); int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList);
int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList); int32_t rightRows = taosArrayGetSize(jtCtx.rightRowsList);
if (rightRows <= 0) { if (rightRows <= 0) {
for (int32_t i = 0; i < leftRows; ++i) { if (0 == jtCtx.rightFilterNum) {
char* leftInRow = taosArrayGet(jtCtx.leftRowsList, i); for (int32_t i = 0; i < leftRows; ++i) {
appendAsofLeftNonMatchGrp(leftInRow); char* leftInRow = (char*)taosArrayGet(jtCtx.leftRowsList, i);
appendAsofLeftNonMatchGrp(leftInRow);
}
} }
} else { } else {
ASSERT(rightRows <= jtCtx.jLimit); ASSERT(rightRows <= jtCtx.jLimit);
for (int32_t i = 0; i < leftRows; ++i) { for (int32_t i = 0; i < leftRows; ++i) {
char* leftInRow = taosArrayGet(jtCtx.leftRowsList, i); char* leftInRow = (char*)taosArrayGet(jtCtx.leftRowsList, i);
appendAsofLeftEachResGrps(leftInRow, 0, rightRows); appendAsofLeftEachResGrps(leftInRow, 0, rightRows);
} }
} }
@ -933,11 +937,11 @@ void chkAppendAsofGreaterResRows(bool forceOut) {
int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList); int32_t leftRows = taosArrayGetSize(jtCtx.leftRowsList);
int32_t i = 0; int32_t i = 0;
for (; i < leftRows; ++i) { for (; i < leftRows; ++i) {
char* leftRow = taosArrayGet(jtCtx.leftRowsList, i); char* leftRow = (char*)taosArrayGet(jtCtx.leftRowsList, i);
int64_t* leftTs = (int64_t*)(leftRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]); int64_t* leftTs = (int64_t*)(leftRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]);
bool append = false; bool append = false;
for (int32_t r = rightOffset; r < rightRows; ++r) { for (int32_t r = rightOffset; r < rightRows; ++r) {
char* rightRow = taosArrayGet(jtCtx.rightRowsList, r); char* rightRow = (char*)taosArrayGet(jtCtx.rightRowsList, r);
int64_t* rightTs = (int64_t*)(rightRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]); int64_t* rightTs = (int64_t*)(rightRow + jtCtx.inColOffset[JT_PRIM_TS_SLOT_ID]);
if ((*leftTs > *rightTs) || (*leftTs == *rightTs && OP_TYPE_LOWER_THAN == jtCtx.asofOpType)) { if ((*leftTs > *rightTs) || (*leftTs == *rightTs && OP_TYPE_LOWER_THAN == jtCtx.asofOpType)) {
rightOffset++; rightOffset++;
@ -962,7 +966,9 @@ void chkAppendAsofGreaterResRows(bool forceOut) {
break; break;
} }
appendAsofLeftNonMatchGrp(leftRow); if (0 == jtCtx.rightFilterNum) {
appendAsofLeftNonMatchGrp(leftRow);
}
} }
} }
@ -1000,31 +1006,20 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
bool keepRes = false; bool keepRes = false;
bool keepInput = false; bool keepInput = false;
if (blkId == LEFT_BLK_ID) { if (blkId == LEFT_BLK_ID) {
if ((jtCtx.joinType == JOIN_TYPE_LEFT || jtCtx.joinType == JOIN_TYPE_FULL) && jtCtx.subType != JOIN_STYPE_SEMI) { if ((jtCtx.joinType == JOIN_TYPE_LEFT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF)) {
keepRes = true; keepRes = true;
} }
peerOffset = MAX_SLOT_NUM; peerOffset = MAX_SLOT_NUM;
} else { } else {
if ((jtCtx.joinType == JOIN_TYPE_RIGHT || jtCtx.joinType == JOIN_TYPE_FULL) && jtCtx.subType != JOIN_STYPE_SEMI) { if ((jtCtx.joinType == JOIN_TYPE_RIGHT || jtCtx.joinType == JOIN_TYPE_FULL) && (jtCtx.subType != JOIN_STYPE_SEMI && jtCtx.subType != JOIN_STYPE_ASOF)) {
keepRes = true; keepRes = true;
} }
tableOffset = MAX_SLOT_NUM; tableOffset = MAX_SLOT_NUM;
} }
if (JOIN_STYPE_ASOF == jtCtx.subType && jtCtx.asofOpType != OP_TYPE_EQUAL) { if (JOIN_STYPE_ASOF == jtCtx.subType) {
keepInput = true; keepInput = jtCtx.asofOpType != OP_TYPE_EQUAL ? true : (blkId == LEFT_BLK_ID);
if (blkId == LEFT_BLK_ID) { pTableRows = (blkId == LEFT_BLK_ID) ? jtCtx.leftRowsList : jtCtx.rightRowsList;
if (NULL == jtCtx.leftRowsList) {
jtCtx.leftRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.inColSize);
}
pTableRows = jtCtx.leftRowsList;
} else {
if (NULL == jtCtx.rightRowsList) {
jtCtx.rightRowsList = taosArrayInit(jtCtx.jLimit, jtCtx.inColSize);
jtCtx.rightFilterOut = taosArrayInit(jtCtx.jLimit, sizeof(bool));
}
pTableRows = jtCtx.rightRowsList;
}
} }
int32_t filterNum = (blkId == LEFT_BLK_ID) ? jtCtx.leftFilterNum : jtCtx.rightFilterNum; int32_t filterNum = (blkId == LEFT_BLK_ID) ? jtCtx.leftFilterNum : jtCtx.rightFilterNum;
@ -1049,12 +1044,15 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
taosArrayPush((blkId == LEFT_BLK_ID) ? jtCtx.leftBlkList : jtCtx.rightBlkList, ppBlk); taosArrayPush((blkId == LEFT_BLK_ID) ? jtCtx.leftBlkList : jtCtx.rightBlkList, ppBlk);
} }
filterOut = (peerFilterNum > 0) ? true : false; filterOut = (peerFilterNum > 0 && jtCtx.subType != JOIN_STYPE_ASOF) ? true : false;
if (!filterOut) { if (!filterOut) {
memset(jtCtx.resColBuf, 0, jtCtx.resColSize); memset(jtCtx.resColBuf, 0, jtCtx.resColSize);
if (keepInput) {
memset(jtCtx.inColBuf, 0, jtCtx.inColSize);
}
} }
addToRowList = false; addToRowList = true;
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
switch (jtInputColType[c]) { switch (jtInputColType[c]) {
@ -1071,7 +1069,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
tmpInt = (taosRand() % 2) ? INT_FILTER_VALUE + jtCtx.grpOffset[c] + taosRand() % vRange : INT_FILTER_VALUE - jtCtx.grpOffset[c] - taosRand() % vRange; tmpInt = (taosRand() % 2) ? INT_FILTER_VALUE + jtCtx.grpOffset[c] + taosRand() % vRange : INT_FILTER_VALUE - jtCtx.grpOffset[c] - taosRand() % vRange;
pData = (char*)&tmpInt; pData = (char*)&tmpInt;
isNull = false; isNull = false;
if (filterNum && filterCol[c] && tmpInt <= INT_FILTER_VALUE) { if (!filterOut && filterNum && filterCol[c] && tmpInt <= INT_FILTER_VALUE) {
filterOut = true; filterOut = true;
} }
} else { } else {
@ -1085,7 +1083,7 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
tmpBigint = (taosRand() % 2) ? BIGINT_FILTER_VALUE + jtCtx.curKeyOffset++ : BIGINT_FILTER_VALUE - jtCtx.curKeyOffset++; tmpBigint = (taosRand() % 2) ? BIGINT_FILTER_VALUE + jtCtx.curKeyOffset++ : BIGINT_FILTER_VALUE - jtCtx.curKeyOffset++;
pData = (char*)&tmpBigint; pData = (char*)&tmpBigint;
isNull = false; isNull = false;
if (filterNum && filterCol[c] && tmpBigint <= BIGINT_FILTER_VALUE) { if (!filterOut && filterNum && filterCol[c] && tmpBigint <= BIGINT_FILTER_VALUE) {
filterOut = true; filterOut = true;
} }
break; break;
@ -1103,7 +1101,8 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
} else { } else {
memcpy(jtCtx.inColBuf + jtCtx.inColOffset[c], pData, tDataTypes[jtInputColType[c]].bytes); memcpy(jtCtx.inColBuf + jtCtx.inColOffset[c], pData, tDataTypes[jtInputColType[c]].bytes);
} }
addToRowList = true; } else {
addToRowList = false;
} }
} else if (keepRes && !filterOut && jtCtx.resColList[tableOffset + c]) { } else if (keepRes && !filterOut && jtCtx.resColList[tableOffset + c]) {
if (isNull) { if (isNull) {
@ -1116,8 +1115,10 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
if (keepInput && addToRowList) { if (keepInput && addToRowList) {
taosArrayPush(pTableRows, jtCtx.inColBuf); taosArrayPush(pTableRows, jtCtx.inColBuf);
bool fout = filterOut ? true : false; if (blkId == RIGHT_BLK_ID) {
taosArrayPush(jtCtx.rightFilterOut, &fout); bool fout = filterOut ? true : false;
taosArrayPush(jtCtx.rightFilterOut, &fout);
}
} }
if (keepRes && !filterOut) { if (keepRes && !filterOut) {
@ -1134,14 +1135,14 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
} }
if (keepInput) { if (keepInput) {
if (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN) { if (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN || jtCtx.asofOpType == OP_TYPE_EQUAL) {
if (blkId == LEFT_BLK_ID) { if (blkId == LEFT_BLK_ID) {
appendAllAsofResRows(); appendAllAsofResRows();
} else { } else {
trimForAsofJlimit(); trimForAsofJlimit();
} }
} else { } else {
chkAppendAsofGreaterResRows(); chkAppendAsofGreaterResRows(false);
} }
} }
@ -1798,7 +1799,7 @@ void addAsofEqInRows(int32_t rowsNum, int64_t tbOffset, bool leftTable) {
} }
} }
if (!leftTable) { if (!leftTable && (jtCtx.asofOpType == OP_TYPE_GREATER_EQUAL || jtCtx.asofOpType == OP_TYPE_GREATER_THAN || jtCtx.asofOpType == OP_TYPE_EQUAL)) {
trimForAsofJlimit(); trimForAsofJlimit();
} }
} }
@ -1833,6 +1834,9 @@ void asofJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
addAsofEqInRows(leftGrpRows, 0, true); addAsofEqInRows(leftGrpRows, 0, true);
addAsofEqInRows(rightGrpRows, rightTbOffset, false); addAsofEqInRows(rightGrpRows, rightTbOffset, false);
chkAppendAsofGreaterResRows(true); chkAppendAsofGreaterResRows(true);
taosArrayClear(jtCtx.leftRowsList);
taosArrayClear(jtCtx.rightRowsList);
taosArrayClear(jtCtx.rightFilterOut);
break; break;
default: default:
return; return;
@ -2095,6 +2099,10 @@ void createDummyBlkList(int32_t leftMaxRows, int32_t leftMaxGrpRows, int32_t rig
jtCtx.rightFinMatch = (bool*)taosMemoryRealloc(jtCtx.rightFinMatch, maxGrpRows * sizeof(bool)); jtCtx.rightFinMatch = (bool*)taosMemoryRealloc(jtCtx.rightFinMatch, maxGrpRows * sizeof(bool));
} }
taosArrayClear(jtCtx.leftRowsList);
taosArrayClear(jtCtx.rightRowsList);
taosArrayClear(jtCtx.rightFilterOut);
createBothBlkRowsData(); createBothBlkRowsData();
} }
@ -2245,6 +2253,23 @@ char* getInputStatStr(char* inputStat) {
return inputStat; return inputStat;
} }
char* getAsofOpStr() {
switch (jtCtx.asofOpType) {
case OP_TYPE_GREATER_THAN:
return ">";
case OP_TYPE_GREATER_EQUAL:
return ">=";
case OP_TYPE_LOWER_THAN:
return "<";
case OP_TYPE_LOWER_EQUAL:
return "<=";
case OP_TYPE_EQUAL:
return "=";
default:
return "UNKNOWN";
}
}
void printBasicInfo(char* caseName) { void printBasicInfo(char* caseName) {
if (!jtCtrl.printTestInfo) { if (!jtCtrl.printTestInfo) {
return; return;
@ -2257,6 +2282,10 @@ void printBasicInfo(char* caseName) {
jtCtx.leftMaxGrpRows, jtCtx.rightMaxGrpRows, jtCtx.blkRows, jtColCondStr[jtCtx.colCond], jtJoinTypeStr[jtCtx.joinType], jtCtx.leftMaxGrpRows, jtCtx.rightMaxGrpRows, jtCtx.blkRows, jtColCondStr[jtCtx.colCond], jtJoinTypeStr[jtCtx.joinType],
jtSubTypeStr[jtCtx.subType], getInputStatStr(inputStat)); jtSubTypeStr[jtCtx.subType], getInputStatStr(inputStat));
if (JOIN_STYPE_ASOF == jtCtx.subType) {
printf("\t asofOp:%s\n\t JLimit:%" PRId64 "\n", getAsofOpStr(), jtCtx.jLimit);
}
printf("Input Info:\n\t totalBlk:left-%d right-%d\n\t totalRows:left-%d right-%d\n\t " printf("Input Info:\n\t totalBlk:left-%d right-%d\n\t totalRows:left-%d right-%d\n\t "
"blkRowSize:%d\n\t inputCols:left-%s %s %s %s right-%s %s %s %s\n", "blkRowSize:%d\n\t inputCols:left-%s %s %s %s right-%s %s %s %s\n",
(int32_t)taosArrayGetSize(jtCtx.leftBlkList), (int32_t)taosArrayGetSize(jtCtx.rightBlkList), (int32_t)taosArrayGetSize(jtCtx.leftBlkList), (int32_t)taosArrayGetSize(jtCtx.rightBlkList),
@ -2475,7 +2504,11 @@ void initJoinTest() {
offset += tDataTypes[jtInputColType[i]].bytes; offset += tDataTypes[jtInputColType[i]].bytes;
} }
jtCtx.inColSize = offset; jtCtx.inColSize = offset;
jtCtx.inColBuf = taosMemoryMalloc(jtCtx.inColSize); jtCtx.inColBuf = (char*)taosMemoryMalloc(jtCtx.inColSize);
jtCtx.leftRowsList = taosArrayInit(1024, jtCtx.inColSize);
jtCtx.rightRowsList = taosArrayInit(1024, jtCtx.inColSize);
jtCtx.rightFilterOut = taosArrayInit(1024, sizeof(bool));
jtInitLogFile(); jtInitLogFile();
} }
@ -2506,18 +2539,13 @@ void handleTestDone() {
jtCtx.resRows = 0; jtCtx.resRows = 0;
jtCtx.inputStat = 0; jtCtx.inputStat = 0;
taosArrayDestroy(jtCtx.leftRowsList);
taosArrayDestroy(jtCtx.rightRowsList);
jtCtx.leftRowsList = NULL;
jtCtx.rightRowsList = NULL;
} }
void runSingleTest(char* caseName, SJoinTestParam* param) { void runSingleTest(char* caseName, SJoinTestParam* param) {
bool contLoop = true; bool contLoop = true;
SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param); SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param);
createDummyBlkList(10, 10, 10, 10, 3); createDummyBlkList(200, 200, 200, 200, 10);
while (contLoop) { while (contLoop) {
rerunBlockedHere(); rerunBlockedHere();
@ -3093,18 +3121,21 @@ TEST(leftAsofJoin, noCondGreaterThanTest) {
#endif #endif
#if 1 #if 1
TEST(leftAsofJoin, eqCondTest) { TEST(leftAsofJoin, noCondGreaterEqTest) {
SJoinTestParam param; SJoinTestParam param;
char* caseName = "leftAsofJoin:eqCondTest"; char* caseName = "leftAsofJoin:noCondGreaterEqTest";
SExecTaskInfo* pTask = createDummyTaskInfo(caseName); SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
param.pTask = pTask; param.pTask = pTask;
param.joinType = JOIN_TYPE_LEFT; param.joinType = JOIN_TYPE_LEFT;
param.subType = JOIN_STYPE_ANTI; param.subType = JOIN_STYPE_ASOF;
param.cond = TEST_EQ_COND; param.cond = TEST_NO_COND;
param.asofOp = OP_TYPE_GREATER_EQUAL;
param.asc = true; param.asc = true;
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
param.filter = false; param.filter = false;
runSingleTest(caseName, &param); runSingleTest(caseName, &param);
@ -3114,9 +3145,94 @@ TEST(leftAsofJoin, eqCondTest) {
printStatInfo(caseName); printStatInfo(caseName);
taosMemoryFree(pTask); taosMemoryFree(pTask);
handleCaseEnd();
} }
#endif #endif
#if 1
TEST(leftAsofJoin, noCondEqTest) {
SJoinTestParam param;
char* caseName = "leftAsofJoin:noCondEqTest";
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
param.pTask = pTask;
param.joinType = JOIN_TYPE_LEFT;
param.subType = JOIN_STYPE_ASOF;
param.cond = TEST_NO_COND;
param.asofOp = OP_TYPE_EQUAL;
param.asc = true;
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
param.filter = false;
runSingleTest(caseName, &param);
param.filter = true;
runSingleTest(caseName, &param);
}
printStatInfo(caseName);
taosMemoryFree(pTask);
}
#endif
#if 1
TEST(leftAsofJoin, noCondLowerThanTest) {
SJoinTestParam param;
char* caseName = "leftAsofJoin:noCondLowerThanTest";
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
param.pTask = pTask;
param.joinType = JOIN_TYPE_LEFT;
param.subType = JOIN_STYPE_ASOF;
param.cond = TEST_NO_COND;
param.asofOp = OP_TYPE_LOWER_THAN;
param.asc = true;
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
param.filter = false;
runSingleTest(caseName, &param);
param.filter = true;
runSingleTest(caseName, &param);
}
printStatInfo(caseName);
taosMemoryFree(pTask);
}
#endif
#if 1
TEST(leftAsofJoin, noCondLowerEqTest) {
SJoinTestParam param;
char* caseName = "leftAsofJoin:noCondLowerEqTest";
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
param.pTask = pTask;
param.joinType = JOIN_TYPE_LEFT;
param.subType = JOIN_STYPE_ASOF;
param.cond = TEST_NO_COND;
param.asofOp = OP_TYPE_LOWER_EQUAL;
param.asc = true;
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
param.filter = false;
runSingleTest(caseName, &param);
param.filter = true;
runSingleTest(caseName, &param);
}
printStatInfo(caseName);
taosMemoryFree(pTask);
}
#endif
#endif #endif