enh: support asof join

This commit is contained in:
dapan1121 2024-01-18 16:50:47 +08:00
parent dec6e3fffe
commit 97567f74cb
3 changed files with 884 additions and 84 deletions

View File

@ -117,6 +117,8 @@ typedef struct SMJoinTableCtx {
int32_t grpIdx;
bool noKeepEqGrpRows;
bool multiEqGrpRows;
int64_t eqRowLimit;
int64_t eqRowNum;
SArray* eqGrps;
SArray* createdBlks;
@ -150,20 +152,37 @@ typedef struct SMJoinGrpRows {
bool readMatch;
} SMJoinGrpRows;
#define MJOIN_COMMON_CTX \
struct SMJoinOperatorInfo* pJoin; \
bool ascTs; \
bool grpRemains; \
SSDataBlock* finBlk; \
bool lastEqGrp; \
bool lastProbeGrp; \
int32_t blkThreshold; \
int64_t jLimit
typedef struct SMJoinCommonCtx {
MJOIN_COMMON_CTX;
} SMJoinCommonCtx;
typedef struct SMJoinMergeCtx {
// KEEP IT FIRST
struct SMJoinOperatorInfo* pJoin;
bool ascTs;
bool hashCan;
bool keepOrder;
bool grpRemains;
bool midRemains;
bool nmatchRemains;
bool lastEqGrp;
bool ascTs;
bool grpRemains;
SSDataBlock* finBlk;
bool lastEqGrp;
bool lastProbeGrp;
int32_t blkThreshold;
int64_t jLimit;
// KEEP IT FIRST
bool hashCan;
bool keepOrder;
bool midRemains;
bool nmatchRemains;
SSDataBlock* midBlk;
SSDataBlock* finBlk;
int64_t lastEqTs;
SMJoinGrpRows probeNEqGrp;
SMJoinGrpRows buildNEqGrp;
@ -172,31 +191,53 @@ typedef struct SMJoinMergeCtx {
joinCartFp mergeCartFp;
} SMJoinMergeCtx;
typedef enum {
E_CACHE_NONE = 0,
E_CACHE_OUTBLK,
E_CACHE_INBLK
} SMJoinCacheMode;
typedef struct SAsofJoinGrpRows {
SSDataBlock* blk;
bool clonedBlk;
int32_t blkRowIdx;
int32_t readIdx;
} SAsofJoinGrpRows;
typedef struct SMJoinWinCache {
int32_t pageLimit;
int64_t rowsNum;
int32_t rowOffset;
int32_t outBlkIdx;
int32_t outRowOffset;
int32_t outRowIdx;
int32_t colNum;
SSDataBlock* blk;
int32_t rowNum;
int8_t grpIdx;
SArray* grps;
SSDataBlock* outBlk;
} SMJoinWinCache;
typedef struct SMJoinWindowCtx {
// KEEP IT FIRST
struct SMJoinOperatorInfo* pJoin;
bool ascTs;
bool grpRemains;
SSDataBlock* finBlk;
bool lastEqGrp;
bool lastProbeGrp;
int32_t blkThreshold;
int64_t jLimit;
// KEEP IT FIRST
int32_t asofOpType;
bool asofLowerRow;
bool asofEqRow;
bool asofGreaterRow;
int64_t jLimit;
int32_t blkThreshold;
SSDataBlock* finBlk;
bool grpRemains;
bool eqPostDone;
int64_t lastTs;
bool rowRemains;
SMJoinGrpRows probeGrp;
SMJoinGrpRows buildGrp;
SMJoinWinCache cache;
} SMJoinWindowCtx;
@ -252,8 +293,8 @@ typedef struct SMJoinOperatorInfo {
#define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair))
#define PROBE_TS_UNREACH(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts))
#define PROBE_TS_OVER(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts))
#define PROBE_TS_LOWER(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts))
#define PROBE_TS_GREATER(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts))
#define GRP_REMAIN_ROWS(_grp) ((_grp)->endIdx - (_grp)->readIdx + 1)
#define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx)
@ -268,13 +309,44 @@ typedef struct SMJoinOperatorInfo {
#define MJOIN_SET_ROW_BITMAP(_b, _base, _idx) colDataClearNull_f((_b + _base), _idx)
#define ASOF_EQ_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_EQUAL == (_op))
#define ASOF_LOWER_ROW_INCLUDED(_op) (OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_LOWER_THAN == (_op))
#define ASOF_GREATER_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_GREATER_THAN == (_op))
#define ASOF_LOWER_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_GREATER_THAN == (_op))
#define ASOF_GREATER_ROW_INCLUDED(_op) (OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_LOWER_THAN == (_op))
#define MJOIN_PUSH_BLK_TO_CACHE(_cache, _blk) \
do { \
ASSERT(taosArrayGetSize(_cache)->grps <= 1); \
SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayReserve(_cache)->grps, 1); \
(_cache)->rowNum += (_blk)->info.rows; \
pGrp->blk = (_blk); \
pGrp->beginIdx = 0; \
} while (0)
#define MJOIN_RESTORE_TB_BLK(_cache, _tb) \
do { \
SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \
if (NULL != pGrp) { \
(_tb)->blk = pGrp->blk; \
(_tb)->blkRowIdx = pGrp->beginIdx; \
} else { \
(_tb)->blk = NULL; \
(_tb)->blkRowIdx = 0; \
} \
} while (0)
#define MJOIN_POP_TB_BLK(_cache) \
do { \
SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \
if (NULL != pGrp) { \
if (pGrp->blk == (_cache)->outBlk) { \
blockDataCleanup(pGrp->blk); \
} \
taosArrayPopFrontBatch((_cache)->grps, 1); \
} \
} while (0)
#define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \
do { \
if (NULL != (_tb)->blk && (_tb)->blkRowIdx < (_tb)->blk->info.rows) { \
if (NULL != (_tb)->blk && (_tb)->blkRowIdx < (_tb)->blk->info.rows) { \
(_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \
(_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \
} else { \
@ -331,8 +403,8 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl
int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx);
int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx);
int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset);
int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs);
int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs);
int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs);
int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs);
int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo);
int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo);

View File

@ -480,15 +480,15 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
} else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
} else if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
} else {
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
if (PROBE_TS_OVER(pCtx->ascTs, probeTs, buildTs)) {
if (PROBE_TS_GREATER(pCtx->ascTs, probeTs, buildTs)) {
continue;
}
@ -711,7 +711,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) {
continue;
}
if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) {
if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
continue;
@ -1074,10 +1074,10 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
continue;
}
if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
} else {
MJ_ERR_JRET(mJoinProcessOverGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs));
MJ_ERR_JRET(mJoinProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs));
}
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
@ -1410,7 +1410,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) {
continue;
}
if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) {
if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
continue;
@ -1671,15 +1671,15 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
} else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
} else if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
} else {
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
if (PROBE_TS_OVER(pCtx->ascTs, probeTs, buildTs)) {
if (PROBE_TS_GREATER(pCtx->ascTs, probeTs, buildTs)) {
continue;
}
@ -1727,24 +1727,340 @@ static bool mAsofJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi
mJoinSetDone(pOperator);
return false;
}
break;
} while (true);
if (buildGot) {
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
pCtx->probeGrp.blk = pJoin->probe->blk;
pCtx->buildGrp.blk = pJoin->build->blk;
return true;
}
int32_t mAsofJoinCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) {
if (pCache->outBlk->info.rows <= 0) {
*evictRows = 0;
return TMIN(jLimit, newRows);
}
if ((pCache->outBlk->info.rows + newRows) <= jLimit) {
*evictRows = 0;
return newRows;
}
if (newRows >= jLimit) {
*evictRows = pCache->outBlk->info.rows;
return jLimit;
}
*evictRows = pCache->outBlk->info.rows + newRows - jLimit;
return newRows;
}
int32_t mAsofJoinAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool fromBegin) {
int32_t evictRows = 0;
SMJoinWinCache* pCache = &pCtx->cache;
int32_t rows = mAsofJoinCalcRowNum(pCache, pCtx->jLimit, pGrp->endIdx - pGrp->beginIdx + 1, &evictRows);
if (evictRows > 0) {
MJ_ERR_RET(blockDataTrimFirstRows(pCache->outBlk, evictRows));
}
int32_t startIdx = fromBegin ? pGrp->beginIdx : pGrp->endIdx - rows + 1;
return blockDataMergeNRows(pCache->outBlk, pGrp->blk, startIdx, rows);
}
int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
return TSDB_CODE_SUCCESS;
}
pGrp->beginIdx = pTable->blkRowIdx;
pGrp->readIdx = pTable->blkRowIdx;
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
if (timestamp != *(int64_t*)pEndVal) {
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
if (timestamp == *(int64_t*)pNextVal) {
continue;
}
return TSDB_CODE_SUCCESS
}
}
pGrp->endIdx = pTable->blk->info.rows - 1;
pTable->blkRowIdx = pTable->blk->info.rows;
if (wholeBlk) {
*wholeBlk = true;
}
return TSDB_CODE_SUCCESS;
}
int32_t mAsofJoinProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
pCtx->buildGrp.beginIdx = pJoin->build->blkRowIdx;
pCtx->buildGrp.readIdx = pCtx->buildGrp.beginIdx;
pCtx->buildGrp.endIdx = pCtx->buildGrp.beginIdx;
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build);
if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) {
pCtx->buildGrp.endIdx = pJoin->build->blkRowIdx;
continue;
}
break;
}
pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
return mAsofJoinHandleProbeGreater(pCtx);
}
int32_t mAsofJoinAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) {
int64_t eqRowsNum = 0;
SMJoinGrpRows grp = {.blk = pTable->blk};
do {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
return TSDB_CODE_SUCCESS;
}
grp.beginIdx = pTable->blkRowIdx;
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
if (timestamp != *(int64_t*)pEndVal) {
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
if (timestamp == *(int64_t*)pNextVal) {
continue;
}
break;
}
grp.endIdx = pTable->blkRowIdx - 1;
} else {
grp.endIdx = pTable->blk->info.rows - 1;
pTable->blkRowIdx = pTable->blk->info.rows;
}
if (eqRowsNum < pCtx->jLimit) {
MJ_ERR_RET(mAsofJoinAddRowsToCache(pCtx, &grp, false));
}
eqRowsNum += grp.endIdx - grp.beginIdx + 1;
if (pTable->blkRowIdx == pTable->blk->info.rows) {
pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx);
qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
pTable->blkRowIdx = 0;
if (NULL == pTable->blk) {
pTable->dsFetchDone = true;
break;
}
} else {
break;
}
} while (true);
return TSDB_CODE_SUCCESS;
}
int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
if (pCtx->cache.outBlk->info.rows <= 0) {
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, probeGrp, true);
}
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = &pCtx->probeGrp;
SMJoinGrpRows buildGrp = {.blk = pCtx->cache.outBlk, .readIdx = pCtx->cache.outRowIdx, .endIdx = pCtx->cache.outBlk->info.rows - 1};
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx;
int64_t totalResRows = (0 == pCtx->cache.outRowIdx) ? (probeRows * pCtx->cache.outBlk->info.rows) :
(pCtx->cache.outBlk->info.rows - pCtx->cache.outRowIdx + (probeRows - 1) * pCtx->cache.outBlk->info.rows);
if (totalResRows <= rowsLeft) {
if (0 == pCtx->cache.outRowIdx) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
pCtx->grpRemains = false;
return TSDB_CODE_SUCCESS;
}
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
if (++probeGrp->readIdx <= probeEndIdx) {
probeGrp->endIdx = probeEndIdx;
buildGrp->readIdx = 0;
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
}
pCtx->grpRemains = false;
return TSDB_CODE_SUCCESS;
}
for (; !GRP_DONE(probeGrp) && rowsLeft > 0; ) {
if (0 == pCtx->cache.outRowIdx) {
int32_t grpNum = rowsLeft / pCtx->cache.outBlk->info.rows;
if (grpNum > 0) {
probeGrp->endIdx = probeGrp->readIdx + grpNum - 1;
buildGrp.readIdx = 0;
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
rowsLeft -= grpNum * pCtx->cache.outBlk->info.rows;
pCtx->cache.outRowIdx = 0;
probeGrp->readIdx += grpNum;
continue;
}
}
probeGrp->endIdx = probeGrp->readIdx;
buildGrp.readIdx = pCtx->cache.outRowIdx;
int32_t grpRemainRows = pCtx->cache.outBlk->info.rows - pCtx->cache.outRowIdx;
if (rowsLeft >= grpRemainRows) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
rowsLeft -= grpRemainRows;
pCtx->cache.outRowIdx = 0;
continue;
}
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
pCtx->cache.outRowIdx += rowsLeft;
break;
}
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, bool lastBuildGrp) {
if (!pCtx->asofEqRow) {
MJ_ERR_RET(mAsofLowerDumpGrpCache(pCtx));
if (pCtx->grpRemains) {
return TSDB_CODE_SUCCESS;
}
if (!pCtx->eqPostDone && !lastBuildGrp) {
pCtx->eqPostDone = true;
return mAsofJoinAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs);
}
return TSDB_CODE_SUCCESS;
}
if (!pCtx->eqPostDone && !lastBuildGrp) {
pCtx->eqPostDone = true;
MJ_ERR_RET(mAsofJoinAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs));
}
return mAsofLowerDumpGrpCache(pCtx);
}
int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
SMJoinOperatorInfo* pJoin = pCtx->pJoin;
pCtx->lastEqGrp = true;
pCtx->eqPostDone = false;
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &pCtx->probeGrp));
return mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp);
}
int32_t mAsofLowerProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
pCtx->lastEqGrp = false;
pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pJoin->probe);
if (PROBE_TS_LOWER(pCtx->ascTs, *probeTs, *buildTs)) {
pCtx->probeGrp.endIdx = pJoin->probe->blkRowIdx;
continue;
}
break;
}
return mAsofLowerDumpGrpCache(pCtx);
}
int32_t mAsofLowerProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
pCtx->lastEqGrp = false;
pCtx->buildGrp.beginIdx = pJoin->build->blkRowIdx;
pCtx->buildGrp.readIdx = pCtx->buildGrp.beginIdx;
pCtx->buildGrp.endIdx = pCtx->buildGrp.beginIdx;
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build);
if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) {
pCtx->buildGrp.endIdx = pJoin->build->blkRowIdx;
continue;
}
break;
}
pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
MJ_ERR_RET(mAsofJoinAddRowsToCache(pCtx, &pCtx->buildGrp, false));
return mAsofLowerDumpGrpCache(pCtx);
}
int32_t mAsofLowerHandleGrpRemains(SMJoinWindowCtx* pCtx) {
return (pCtx->lastEqGrp) ? mAsofLowerDumpUpdateEqRows(pCtx, pCtx->pJoin, false) : mAsofLowerDumpGrpCache(pCtx);
}
static bool mAsofLowerRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot) {
mJoinSetDone(pOperator);
return false;
}
break;
} while (true);
pCtx->probeGrp.blk = pJoin->probe->blk;
pCtx->buildGrp.blk = pJoin->build->blk;
return true;
}
SSDataBlock* mAsofJoinDo(struct SOperatorInfo* pOperator) {
SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
int32_t code = TSDB_CODE_SUCCESS;
@ -1756,7 +2072,7 @@ SSDataBlock* mAsofJoinDo(struct SOperatorInfo* pOperator) {
blockDataCleanup(pCtx->finBlk);
if (pCtx->grpRemains) {
MJ_ERR_JRET(mAsofJoinHandleGrpRemains(pCtx));
MJ_ERR_JRET(mAsofLowerHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
@ -1764,15 +2080,15 @@ SSDataBlock* mAsofJoinDo(struct SOperatorInfo* pOperator) {
}
do {
if (!mAsofJoinRetrieve(pOperator, pJoin, pCtx)) {
if (!mAsofLowerRetrieve(pOperator, pJoin, pCtx)) {
break;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
if (probeTs == pCtx->lastTs) {
MJ_ERR_JRET(mAsofLowerProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
@ -1786,40 +2102,413 @@ SSDataBlock* mAsofJoinDo(struct SOperatorInfo* pOperator) {
while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
pCtx->lastTs = probeTs;
MJ_ERR_JRET(mAsofLowerProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
} else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
continue;
}
if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mAsofLowerProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs));
} else {
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
if (PROBE_TS_OVER(pCtx->ascTs, probeTs, buildTs)) {
continue;
}
break;
}
MJ_ERR_JRET(mAsofLowerProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs));
}
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
}
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) {
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
MJ_ERR_JRET(mAsofLowerDumpGrpCache(pCtx));
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
}
} while (true);
_return:
if (code) {
pJoin->errCode = code;
return NULL;
}
return pCtx->finBlk;
}
int32_t mAsofGreaterDumpGrpCache(SMJoinWindowCtx* pCtx) {
int64_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinWinCache* cache = &pCtx->cache;
int32_t buildGrpNum = taosArrayGetSize(cache->grps);
int64_t buildTotalRows = (cache->rowNum > pCtx->jLimit) ? pCtx->jLimit : cache->rowNum;
if (buildGrpNum <= 0 || buildTotalRows <= 0) {
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true);
}
SMJoinGrpRows* probeGrp = &pCtx->probeGrp;
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx;
if (0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) {
SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0);
if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
buildGrp->readIdx = buildGrp->beginIdx;
}
pCtx->grpRemains = false;
return TSDB_CODE_SUCCESS;
}
}
for (; !GRP_DONE(probeGrp); ) {
probeGrp->endIdx = probeGrp->readIdx;
for (; cache->grpIdx < buildGrpNum && rowsLeft > 0; ++cache->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
buildGrp->readIdx = buildGrp->beginIdx;
continue;
}
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
probeGrp->endIdx = probeEndIdx;
if (cache->grpIdx >= buildGrpNum) {
cache->grpIdx = 0;
++probeGrp->readIdx;
}
if (rowsLeft <= 0) {
break;
}
}
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) {
if (pCtx->cache.rowNum >= pCtx->jLimit || pCtx->pJoin->build->dsFetchDone) {
return TSDB_CODE_SUCCESS;
}
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinWinCache* pCache = &pCtx->cache;
int32_t grpNum = taosArrayGetSize(pCache->grps);
ASSERT(grpNum >= 1 && grpNum <= 2);
SSDataBlock* pBlk = NULL;
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1);
if (pGrp->blk != pCache->outBlk) {
pBlk = pGrp->blk;
taosArrayPop(pCache->grps);
}
do {
if (NULL != pBlk) {
MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pBlk, build->blkRowIdx, pBlk->info.rows - build->blkRowIdx));
}
build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, build->downStreamIdx);
qDebug("%s merge join %s table got block to fill grp, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
build->blkRowIdx = 0;
if (NULL == build->blk) {
build->dsFetchDone = true;
break;
}
MJOIN_PUSH_BLK_TO_CACHE(pCache, build->blk);
pBlk = build->blk;
} while (pCache->rowNum < pCtx->jLimit);
MJOIN_RESTORE_TB_BLK(pCache, build);
return TSDB_CODE_SUCCESS;
}
int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) {
if (!lastBuildGrp) {
MJ_ERR_RET(mAsofGreaterChkFillGrpCache(pCtx));
}
return mAsofGreaterDumpGrpCache(pCtx);
}
int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
*wholeBlk = false;
return TSDB_CODE_SUCCESS;
}
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
if (timestamp != *(int64_t*)pEndVal) {
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
if (timestamp == *(int64_t*)pNextVal) {
continue;
}
return TSDB_CODE_SUCCESS
}
}
*wholeBlk = true;
return TSDB_CODE_SUCCESS;
}
int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) {
SMJoinWinCache* cache = &pCtx->cache;
int32_t grpNum = taosArrayGetSize(cache->grps);
SMJoinTableCtx* pTable = pCtx->pJoin->build;
bool wholeBlk = false;
do {
do {
MJ_ERR_RET(mAsofGreaterSkipEqRows(pTable, timestamp, &wholeBlk));
if (!wholeBlk) {
return TSDB_CODE_SUCCESS;
}
MJOIN_POP_TB_BLK(cache);
MJOIN_RESTORE_TB_BLK(cache, pTable);
} while (!MJOIN_BUILD_TB_ROWS_DONE(pTable));
pTable->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pTable->downStreamIdx);
qDebug("%s merge join %s table got block to skip eq ts, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
pTable->blkRowIdx = 0;
if (NULL == pTable->blk) {
pTable->dsFetchDone = true;
return TSDB_CODE_SUCCESS;
}
MJOIN_PUSH_BLK_TO_CACHE(cache, pTable->blk);
} while (true);
return TSDB_CODE_SUCCESS;
}
int32_t mAsofGreaterUpdateDumpEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
if (!pCtx->asofEqRow && !lastBuildGrp) {
MJ_ERR_RET(mAsofGreaterSkipAllEqRows(pCtx, timestamp));
}
return mAsofGreaterFillDumpGrpCache(pCtx, lastBuildGrp);
}
int32_t mAsofGreaterProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
SMJoinOperatorInfo* pJoin = pCtx->pJoin;
pCtx->lastEqGrp = true;
pCtx->cache.grpIdx = 0;
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &pCtx->probeGrp));
return mAsofGreaterUpdateDumpEqRows(pCtx, timestamp, lastBuildGrp);
}
int32_t mAsofGreaterProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
pCtx->lastEqGrp = false;
pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pJoin->probe);
if (PROBE_TS_LOWER(pCtx->ascTs, *probeTs, *buildTs)) {
pCtx->probeGrp.endIdx = pJoin->probe->blkRowIdx;
continue;
}
break;
}
pCtx->cache.grpIdx = 0;
return mAsofGreaterFillDumpGrpCache(pCtx, false);
}
int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
do {
MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build);
if (!PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) {
break;
}
pCtx->cache.rowNum--;
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build);
if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) {
pCtx->cache.rowNum--;
continue;
}
return TSDB_CODE_SUCCESS;
}
MJOIN_POP_TB_BLK(&pCtx->cache);
MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build);
} while (!MJOIN_BUILD_TB_ROWS_DONE(pJoin->build));
return TSDB_CODE_SUCCESS;
}
static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot) {
mJoinSetDone(pOperator);
return false;
}
if (buildGot && pCtx->asofGreaterRow) {
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
continue;
}
}
break;
} while (true);
if (buildGot) {
MJOIN_PUSH_BLK_TO_CACHE(&pCtx->cache, pJoin->build->blk);
MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build);
}
pCtx->probeGrp.blk = pJoin->probe->blk;
pCtx->buildGrp.blk = pJoin->build->blk;
return true;
}
int32_t mAsofGreaterHandleGrpRemains(SMJoinWindowCtx* pCtx) {
return mAsofGreaterDumpGrpCache(pCtx);
}
SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
int32_t code = TSDB_CODE_SUCCESS;
int64_t probeTs = 0;
int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL;
blockDataCleanup(pCtx->finBlk);
if (pCtx->grpRemains) {
MJ_ERR_JRET(mAsofGreaterHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
pCtx->grpRemains = false;
}
do {
if (!mAsofGreaterRetrieve(pOperator, pJoin, pCtx)) {
break;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
if (probeTs == pCtx->lastTs) {
MJ_ERR_JRET(mAsofGreaterProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
continue;
} else {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
}
}
while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
if (probeTs == buildTs) {
pCtx->lastTs = probeTs;
MJ_ERR_JRET(mAsofGreaterProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
continue;
}
if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mAsofGreaterProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs));
} else {
MJ_ERR_JRET(mAsofGreaterProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs));
}
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
}
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) {
pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true));
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
@ -1841,15 +2530,15 @@ int32_t mJoinInitWinCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJ
pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT;
pCache->colNum = pJoin->build->finNum;
pCache->blk = createOneDataBlock(pCtx->finBlk, false);
if (NULL == pCache->blk) {
pCache->outBlk = createOneDataBlock(pCtx->finBlk, false);
if (NULL == pCache->outBlk) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCache->blk->info.capacity = pCtx->jLimit;
pCache->outBlk->info.capacity = pCtx->jLimit;
SMJoinTableCtx* build = pJoin->build;
for (int32_t i = 0; i < pCache->colNum; ++i) {
SColumnInfoData* pCol = taosArrayGet(pCache->blk->pDataBlock, build->finCols[i].dstSlot);
SColumnInfoData* pCol = taosArrayGet(pCache->outBlk->pDataBlock, build->finCols[i].dstSlot);
doEnsureCapacity(pCol, NULL, pCtx->jLimit, false);
}
@ -1886,6 +2575,12 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
pCtx->pJoin = pJoin;
pCtx->lastEqTs = INT64_MIN;
pCtx->hashCan = pJoin->probe->keyNum > 0;
if (JOIN_STYPE_ASOF == pJoin->subType) {
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
pJoin->subType = JOIN_STYPE_OUTER;
} else {
pCtx->jLimit = -1;
}
if (pJoinNode->node.inputTsOrder != ORDER_DESC) {
pCtx->ascTs = true;

View File

@ -419,7 +419,7 @@ int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app
}
int32_t mJoinNonEqCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp) {
int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp) {
pCtx->lastEqGrp = false;
pCtx->lastProbeGrp = probeGrp;
@ -596,7 +596,7 @@ int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum
while (++pTb->blkRowIdx < pTb->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pTb);
if (PROBE_TS_UNREACH(pCtx->ascTs, *probeTs, *buildTs)) {
if (PROBE_TS_LOWER(pCtx->ascTs, *probeTs, *buildTs)) {
pCtx->probeNEqGrp.endIdx = pTb->blkRowIdx;
continue;
}
@ -607,7 +607,7 @@ int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum
return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true);
}
int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
pCtx->buildNEqGrp.blk = pTb->blk;
pCtx->buildNEqGrp.beginIdx = pTb->blkRowIdx;
pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx;
@ -615,7 +615,7 @@ int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnIn
while (++pTb->blkRowIdx < pTb->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pTb);
if (PROBE_TS_OVER(pCtx->ascTs, *probeTs, *buildTs)) {
if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) {
pCtx->buildNEqGrp.endIdx = pTb->blkRowIdx;
continue;
}
@ -765,6 +765,9 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi
pTable->noKeepEqGrpRows = (JOIN_STYPE_ANTI == pJoin->subType && NULL == pJoin->pFPreFilter);
pTable->multiEqGrpRows = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pFPreFilter);
pTable->multiRowsGrp = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pPreFilter);
if (JOIN_STYPE_ASOF == pJoin->subType) {
pTable->eqRowLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
}
} else {
pTable->multiEqGrpRows = true;
}
@ -808,8 +811,9 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
}
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
if (JOIN_STYPE_ASOF == pJoin->subType || JOIN_STYPE_WIN == pJoin->subType) {
//return mJoinInitWindowCtx(pJoin, pJoinNode);
if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType)))
|| JOIN_STYPE_WIN == pJoin->subType) {
return mJoinInitWindowCtx(pJoin, pJoinNode);
}
return mJoinInitMergeCtx(pJoin, pJoinNode);
@ -880,6 +884,7 @@ int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t
void mJoinResetForBuildTable(SMJoinTableCtx* pTable) {
pTable->grpTotalRows = 0;
pTable->grpIdx = 0;
pTable->eqRowNum = 0;
mJoinDestroyCreatedBlks(pTable->createdBlks);
taosArrayClear(pTable->eqGrps);
if (pTable->rowBitmapSize > 0) {
@ -900,6 +905,7 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
mJoinResetForBuildTable(pTable);
}
bool keepGrp = true;
pGrp = taosArrayReserve(pTable->eqGrps, 1);
pGrp->beginIdx = pTable->blkRowIdx++;
@ -927,6 +933,14 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
if (!pTable->multiEqGrpRows) {
pGrp->endIdx = pGrp->beginIdx;
} else if (0 == pTable->eqRowLimit) {
// DO NOTHING
} else if (pTable->eqRowLimit == pTable->eqRowNum) {
keepGrp = false;
} else {
int64_t rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum);
pGrp->endIdx = pGrp->beginIdx + rowNum - 1;
pTable->eqRowNum += rowNum;
}
goto _return;
@ -936,28 +950,47 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
if (wholeBlk && (pTable->multiEqGrpRows || restart)) {
*wholeBlk = true;
if (pTable->noKeepEqGrpRows) {
if (pTable->noKeepEqGrpRows || !keepGrp) {
goto _return;
}
if (0 == pGrp->beginIdx && pTable->multiEqGrpRows) {
if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) {
pGrp->blk = createOneDataBlock(pTable->blk, true);
taosArrayPush(pTable->createdBlks, &pGrp->blk);
} else {
if (!pTable->multiEqGrpRows) {
pGrp->endIdx = pGrp->beginIdx;
}
pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, pGrp->endIdx - pGrp->beginIdx + 1);
pGrp->endIdx -= pGrp->beginIdx;
pGrp->beginIdx = 0;
pGrp->readIdx = 0;
int64_t rowNum = 0;
if (!pTable->multiEqGrpRows) {
rowNum = 1;
pGrp->endIdx = pGrp->beginIdx;
} else if (0 == pTable->eqRowLimit) {
rowNum = pGrp->endIdx - pGrp->beginIdx + 1;
} else if (pTable->eqRowLimit == pTable->eqRowNum) {
keepGrp = false;
} else {
rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum);
pGrp->endIdx = pGrp->beginIdx + rowNum - 1;
}
if (keepGrp && rowNum > 0) {
pTable->eqRowNum += rowNum;
pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum);
pGrp->endIdx -= pGrp->beginIdx;
pGrp->beginIdx = 0;
pGrp->readIdx = 0;
taosArrayPush(pTable->createdBlks, &pGrp->blk);
}
}
taosArrayPush(pTable->createdBlks, &pGrp->blk);
}
_return:
if (pTable->noKeepEqGrpRows || (!pTable->multiEqGrpRows && !restart)) {
if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) {
taosArrayPop(pTable->eqGrps);
} else {
pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;