enh: support semi/anti join

This commit is contained in:
dapan1121 2024-01-10 18:56:49 +08:00
parent 0ef7f089be
commit 5f0440a42c
4 changed files with 1699 additions and 102 deletions

View File

@ -19,9 +19,9 @@
extern "C" {
#endif
#if 0
#if 1
#define MJOIN_DEFAULT_BLK_ROWS_NUM 2 //4096
#define MJOIN_HJOIN_CART_THRESHOLD 16
#define MJOIN_HJOIN_CART_THRESHOLD 2
#define MJOIN_BLK_SIZE_LIMIT 0 //10485760
#define MJOIN_ROW_BITMAP_SIZE (2 * 1048576)
#else
@ -114,6 +114,8 @@ typedef struct SMJoinTableCtx {
int64_t grpTotalRows;
int32_t grpIdx;
bool noKeepEqGrpRows;
bool multiEqGrpRows;
SArray* eqGrps;
SArray* createdBlks;
@ -122,6 +124,7 @@ typedef struct SMJoinTableCtx {
int32_t grpArrayIdx;
SArray* pGrpArrays;
bool multiRowsGrp;
int32_t grpRowIdx;
SArray* pHashCurGrp;
SMJoinHashGrpRows* pHashGrpRows;
@ -168,9 +171,9 @@ typedef struct SMJoinMergeCtx {
joinCartFp mergeCartFp;
} SMJoinMergeCtx;
typedef struct SMJoinWinCtx {
typedef struct SMJoinWindowCtx {
} SMJoinWinCtx;
} SMJoinWindowCtx;
typedef struct SMJoinFlowFlags {
@ -183,8 +186,8 @@ typedef struct SMJoinFlowFlags {
typedef struct SMJoinCtx {
SMJoinFlowFlags* pFlags;
union {
SMJoinMergeCtx mergeCtx;
SMJoinWinCtx winCtx;
SMJoinMergeCtx mergeCtx;
SMJoinWindowCtx windowCtx;
};
} SMJoinCtx;
@ -279,6 +282,8 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator);
bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb);
void mJoinSetDone(SOperatorInfo* pOperator);
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen);
@ -299,6 +304,8 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo
int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset);
int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs);
int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs);
int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo);
int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo);
#ifdef __cplusplus
}

View File

@ -642,7 +642,7 @@ static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJo
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot || !buildGot) {
if (!probeGot) {
mJoinSetDone(pOperator);
return false;
}
@ -684,11 +684,14 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) {
return pCtx->finBlk;
}
if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
continue;
} else {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
}
}
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
} else if (MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
mJoinSetDone(pOperator);
break;
}
do {
@ -706,7 +709,9 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) {
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
continue;
} else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
}
if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
continue;
@ -759,8 +764,6 @@ static int32_t mFullJoinMergeCart(SMJoinMergeCtx* pCtx) {
return (NULL == pCtx->pJoin->pFPreFilter) ? mOuterJoinMergeFullCart(pCtx) : mOuterJoinMergeSeqCart(pCtx);
}
const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2};
static FORCE_INLINE int32_t mFullJoinOutputHashRow(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, int32_t idx) {
SMJoinGrpRows grp = {0};
SMJoinRowPos* pPos = taosArrayGet(pGrpRows->pRows, idx);
@ -785,6 +788,7 @@ static int32_t mFullJoinOutputHashGrpRows(SMJoinMergeCtx* pCtx, SMJoinHashGrpRow
}
static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) {
static const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2};
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinNMatchCtx* pNMatch = &build->nMatchCtx;
if (NULL == pNMatch->pGrp) {
@ -891,6 +895,7 @@ static int32_t mFullJoinOutputMergeGrpRows(SMJoinMergeCtx* pCtx, SMJoinGrpRows*
static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) {
static const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2};
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinNMatchCtx* pNMatch = &build->nMatchCtx;
bool grpDone = false;
@ -906,7 +911,7 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) {
}
if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) {
if (pGrpRows->rowMatchNum <= 0) {
if (!pGrpRows->allRowsNMatch) {
pGrpRows->allRowsNMatch = true;
pNMatch->rowIdx = pGrpRows->beginIdx;
}
@ -952,10 +957,14 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) {
break;
}
}
if (BLK_IS_FULL(pCtx->finBlk)) {
break;
}
}
if (BLK_IS_FULL(pCtx->finBlk)) {
if (pNMatch->bitIdx == bitBytes) {
if (pNMatch->bitIdx >= bitBytes) {
++pNMatch->grpIdx;
pNMatch->bitIdx = 0;
}
@ -1132,6 +1141,583 @@ _return:
}
static int32_t mSemiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
do {
blockDataCleanup(pCtx->midBlk);
mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pPreFilter));
}
if (pCtx->midBlk->info.rows <= 0) {
if (build->grpRowIdx < 0) {
break;
}
continue;
}
ASSERT(1 == pCtx->midBlk->info.rows);
MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
ASSERT(false == pCtx->midRemains);
break;
} while (true);
return TSDB_CODE_SUCCESS;
}
static int32_t mSemiJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
size_t bufLen = 0;
int32_t probeEndIdx = probeGrp->endIdx;
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); probeGrp->readIdx++) {
if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
continue;
}
void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
if (NULL == pGrp) {
continue;
}
build->pHashCurGrp = *(SArray**)pGrp;
build->grpRowIdx = 0;
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mSemiJoinHashGrpCartFilter(pCtx, probeGrp));
probeGrp->endIdx = probeEndIdx;
}
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
size_t bufLen = 0;
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
continue;
}
void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
if (NULL == pGrp) {
continue;
}
build->pHashCurGrp = *(SArray**)pGrp;
ASSERT(1 == taosArrayGetSize(build->pHashCurGrp));
build->grpRowIdx = 0;
mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build);
ASSERT(build->grpRowIdx < 0);
}
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
SMJoinGrpRows* buildGrp = NULL;
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeEndIdx = probeGrp->endIdx;
int32_t rowsLeft = pCtx->midBlk->info.capacity;
do {
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk);
++probeGrp->readIdx, probeGrp->endIdx = probeEndIdx, build->grpIdx = 0) {
probeGrp->endIdx = probeGrp->readIdx;
rowsLeft = pCtx->midBlk->info.capacity;
blockDataCleanup(pCtx->midBlk);
for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
buildGrp->readIdx = buildGrp->beginIdx;
continue;
}
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
ASSERT(buildGrp->endIdx >= buildGrp->readIdx);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pFPreFilter));
}
if (0 == pCtx->midBlk->info.rows) {
if (build->grpIdx == buildGrpNum) {
continue;
}
} else {
ASSERT(1 == pCtx->midBlk->info.rows);
MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
ASSERT(false == pCtx->midRemains);
if (build->grpIdx == buildGrpNum) {
continue;
}
buildGrp->readIdx = buildGrp->beginIdx;
continue;
}
//need break
probeGrp->endIdx = probeEndIdx;
break;
}
if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) {
break;
}
} while (true);
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mSemiJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, 0);
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx;
ASSERT(1 == taosArrayGetSize(build->eqGrps));
ASSERT(buildGrp->beginIdx == buildGrp->endIdx);
if (probeRows <= rowsLeft) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
pCtx->grpRemains = false;
return TSDB_CODE_SUCCESS;
}
probeGrp->endIdx = probeGrp->readIdx + rowsLeft - 1;
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
probeGrp->readIdx = probeGrp->endIdx + 1;
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = true;
return TSDB_CODE_SUCCESS;
}
static int32_t mSemiJoinHashCart(SMJoinMergeCtx* pCtx) {
return (NULL == pCtx->pJoin->pPreFilter) ? mSemiJoinHashFullCart(pCtx) : mSemiJoinHashSeqCart(pCtx);
}
static int32_t mSemiJoinMergeCart(SMJoinMergeCtx* pCtx) {
return (NULL == pCtx->pJoin->pFPreFilter) ? mSemiJoinMergeFullCart(pCtx) : mSemiJoinMergeSeqCart(pCtx);
}
static FORCE_INLINE int32_t mSemiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
}
SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
int32_t code = TSDB_CODE_SUCCESS;
int64_t probeTs = 0;
int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL;
blockDataCleanup(pCtx->finBlk);
if (pCtx->grpRemains) {
MJ_ERR_JRET(mSemiJoinHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
pCtx->grpRemains = false;
}
do {
if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) {
break;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
continue;
}
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
} else if (MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
mJoinSetDone(pOperator);
break;
}
do {
if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
break;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
continue;
}
if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
continue;
}
} else {
if (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
continue;
}
}
break;
} while (true);
} while (true);
_return:
if (code) {
pJoin->errCode = code;
return NULL;
}
return pCtx->finBlk;
}
static FORCE_INLINE int32_t mAntiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
if (pCtx->lastEqGrp) {
return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
}
return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true);
}
static int32_t mAntiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
size_t bufLen = 0;
int32_t probeEndIdx = probeGrp->endIdx;
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
probeGrp->endIdx = probeEndIdx;
continue;
}
void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
if (NULL == pGrp) {
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
probeGrp->endIdx = probeEndIdx;
}
}
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mAntiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
do {
blockDataCleanup(pCtx->midBlk);
mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pPreFilter));
}
if (pCtx->midBlk->info.rows) {
break;
}
if (build->grpRowIdx < 0) {
MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
break;
}
continue;
} while (true);
return TSDB_CODE_SUCCESS;
}
static int32_t mAntiJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
size_t bufLen = 0;
int32_t probeEndIdx = probeGrp->endIdx;
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); probeGrp->readIdx++) {
if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
probeGrp->endIdx = probeEndIdx;
continue;
}
void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
if (NULL == pGrp) {
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
probeGrp->endIdx = probeEndIdx;
continue;
}
build->pHashCurGrp = *(SArray**)pGrp;
build->grpRowIdx = 0;
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mAntiJoinHashGrpCartFilter(pCtx, probeGrp));
probeGrp->endIdx = probeEndIdx;
}
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mAntiJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
static int32_t mAntiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
SMJoinGrpRows* buildGrp = NULL;
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeEndIdx = probeGrp->endIdx;
int32_t rowsLeft = pCtx->midBlk->info.capacity;
do {
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk);
++probeGrp->readIdx, probeGrp->endIdx = probeEndIdx, build->grpIdx = 0) {
probeGrp->endIdx = probeGrp->readIdx;
rowsLeft = pCtx->midBlk->info.capacity;
blockDataCleanup(pCtx->midBlk);
for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
buildGrp->readIdx = buildGrp->beginIdx;
continue;
}
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
ASSERT(buildGrp->endIdx >= buildGrp->readIdx);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pFPreFilter));
}
if (pCtx->midBlk->info.rows > 0) {
if (build->grpIdx < buildGrpNum) {
buildGrp->readIdx = buildGrp->beginIdx;
}
continue;
}
if (build->grpIdx >= buildGrpNum) {
MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
continue;
}
//need break
probeGrp->endIdx = probeEndIdx;
break;
}
if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) {
break;
}
} while (true);
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mAntiJoinHashCart(SMJoinMergeCtx* pCtx) {
return (NULL == pCtx->pJoin->pPreFilter) ? mAntiJoinHashFullCart(pCtx) : mAntiJoinHashSeqCart(pCtx);
}
static int32_t mAntiJoinMergeCart(SMJoinMergeCtx* pCtx) {
return (NULL == pCtx->pJoin->pFPreFilter) ? mAntiJoinMergeFullCart(pCtx) : mAntiJoinMergeSeqCart(pCtx);
}
SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
int32_t code = TSDB_CODE_SUCCESS;
int64_t probeTs = 0;
int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL;
blockDataCleanup(pCtx->finBlk);
if (pCtx->grpRemains) {
MJ_ERR_JRET(mAntiJoinHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
pCtx->grpRemains = false;
}
do {
if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) {
break;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
continue;
} else {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
}
}
while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
} else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
} else {
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
if (PROBE_TS_OVER(pCtx->ascTs, probeTs, buildTs)) {
continue;
}
break;
}
}
}
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) {
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
}
} while (true);
_return:
if (code) {
pJoin->errCode = code;
return NULL;
}
return pCtx->finBlk;
}
int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
return TSDB_CODE_SUCCESS;
}
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
@ -1151,7 +1737,7 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity);
}
pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.5;
pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.9;
switch (pJoin->joinType) {
case JOIN_TYPE_INNER:
@ -1159,10 +1745,25 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
pCtx->mergeCartFp = (joinCartFp)mInnerJoinMergeCart;
break;
case JOIN_TYPE_LEFT:
case JOIN_TYPE_RIGHT:
pCtx->hashCartFp = (joinCartFp)mLeftJoinHashCart;
pCtx->mergeCartFp = (joinCartFp)mLeftJoinMergeCart;
case JOIN_TYPE_RIGHT: {
switch (pJoin->subType) {
case JOIN_STYPE_OUTER:
pCtx->hashCartFp = (joinCartFp)mLeftJoinHashCart;
pCtx->mergeCartFp = (joinCartFp)mLeftJoinMergeCart;
break;
case JOIN_STYPE_SEMI:
pCtx->hashCartFp = (joinCartFp)mSemiJoinHashCart;
pCtx->mergeCartFp = (joinCartFp)mSemiJoinMergeCart;
break;
case JOIN_STYPE_ANTI:
pCtx->hashCartFp = (joinCartFp)mAntiJoinHashCart;
pCtx->mergeCartFp = (joinCartFp)mAntiJoinMergeCart;
break;
default:
break;
}
break;
}
case JOIN_TYPE_FULL:
pCtx->hashCartFp = (joinCartFp)mFullJoinHashCart;
pCtx->mergeCartFp = (joinCartFp)mFullJoinMergeCart;

View File

@ -28,6 +28,155 @@
#include "mergejoin.h"
void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
// int32_t totalRows = pBlock->info.rows;
int32_t bmLen = BitmapLen(totalRows);
char* pBitmap = NULL;
int32_t maxRows = 0;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
// it is a reserved column for scalar function, and no data in this column yet.
if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
continue;
}
int32_t numOfRows = 0;
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
int32_t j = 0;
pDst->varmeta.length = 0;
while (j < totalRows) {
if (pBoolList[j] == 0) {
j += 1;
continue;
}
if (colDataIsNull_var(pDst, j)) {
colDataSetNull_var(pDst, numOfRows);
} else {
// fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first
// copy it to p2
char* p1 = colDataGetVarData(pDst, j);
int32_t len = 0;
if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
len = getJsonValueLen(p1);
} else {
len = varDataTLen(p1);
}
char* p2 = taosMemoryMalloc(len);
memcpy(p2, p1, len);
colDataSetVal(pDst, numOfRows, p2, false);
taosMemoryFree(p2);
}
numOfRows += 1;
j += 1;
break;
}
if (maxRows < numOfRows) {
maxRows = numOfRows;
}
} else {
if (pBitmap == NULL) {
pBitmap = taosMemoryCalloc(1, bmLen);
}
memcpy(pBitmap, pDst->nullbitmap, bmLen);
memset(pDst->nullbitmap, 0, bmLen);
int32_t j = 0;
switch (pDst->info.type) {
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
while (j < totalRows) {
if (pBoolList[j] == 0) {
j += 1;
continue;
}
if (colDataIsNull_f(pBitmap, j)) {
colDataSetNull_f(pDst->nullbitmap, numOfRows);
} else {
((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
}
numOfRows += 1;
j += 1;
break;
}
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
while (j < totalRows) {
if (pBoolList[j] == 0) {
j += 1;
continue;
}
if (colDataIsNull_f(pBitmap, j)) {
colDataSetNull_f(pDst->nullbitmap, numOfRows);
} else {
((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
}
numOfRows += 1;
j += 1;
break;
}
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
while (j < totalRows) {
if (pBoolList[j] == 0) {
j += 1;
continue;
}
if (colDataIsNull_f(pBitmap, j)) {
colDataSetNull_f(pDst->nullbitmap, numOfRows);
} else {
((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
}
numOfRows += 1;
j += 1;
break;
}
break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
while (j < totalRows) {
if (pBoolList[j] == 0) {
j += 1;
continue;
}
if (colDataIsNull_f(pBitmap, j)) {
colDataSetNull_f(pDst->nullbitmap, numOfRows);
} else {
((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
}
numOfRows += 1;
j += 1;
break;
}
break;
}
}
if (maxRows < numOfRows) {
maxRows = numOfRows;
}
}
pBlock->info.rows = maxRows;
if (pBitmap != NULL) {
taosMemoryFree(pBitmap);
}
}
int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx) {
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
@ -138,6 +287,71 @@ _err:
return code;
}
int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) {
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
SColumnInfoData* p = NULL;
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
int32_t status = 0;
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
if (status == FILTER_RESULT_ALL_QUALIFIED) {
pBlock->info.rows = 1;
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
pBlock->info.rows = 0;
} else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
mJoinTrimKeepOneRow(pBlock, pBlock->info.rows, (bool*)p->pData);
}
code = TSDB_CODE_SUCCESS;
_err:
colDataDestroy(p);
taosMemoryFree(p);
return code;
}
int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) {
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
SColumnInfoData* p = NULL;
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
int32_t status = 0;
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
if (status == FILTER_RESULT_NONE_QUALIFIED) {
pBlock->info.rows = 0;
}
code = TSDB_CODE_SUCCESS;
_err:
colDataDestroy(p);
taosMemoryFree(p);
return code;
}
int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
@ -545,6 +759,12 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi
return TSDB_CODE_OUT_OF_MEMORY;
}
}
pTable->noKeepEqGrpRows = (JOIN_STYPE_ANTI == pJoin->subType && NULL == pJoin->pFPreFilter);
pTable->multiEqGrpRows = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pFPreFilter);
pTable->multiRowsGrp = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pPreFilter);
} else {
pTable->multiEqGrpRows = true;
}
return TSDB_CODE_SUCCESS;
@ -586,16 +806,11 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
}
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
#if 0
pJoin->joinFps = &gMJoinFps[pJoin->joinType][pJoin->subType];
int32_t code = (*pJoin->joinFps->initJoinCtx)(pOperator, pJoin);
if (code) {
return code;
if (JOIN_STYPE_ASOF == pJoin->subType || JOIN_STYPE_WIN == pJoin->subType) {
//return mJoinInitWindowCtx(pJoin, pJoinNode);
}
#else
return mJoinInitMergeCtx(pJoin, pJoinNode);
#endif
}
void mJoinSetDone(SOperatorInfo* pOperator) {
@ -675,7 +890,7 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
SMJoinGrpRows* pGrp = NULL;
if (*(int64_t*)colDataGetData(pCol, pTable->blkRowIdx) != timestamp) {
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
return TSDB_CODE_SUCCESS;
}
@ -690,22 +905,46 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
pGrp->endIdx = pGrp->beginIdx;
pGrp->readMatch = false;
pGrp->blk = pTable->blk;
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
char* pNextVal = colDataGetData(pCol, pTable->blkRowIdx);
if (timestamp == *(int64_t*)pNextVal) {
pGrp->endIdx++;
continue;
}
goto _return;
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
if (timestamp == *(int64_t*)pEndVal) {
if (pTable->multiEqGrpRows) {
pGrp->endIdx = pTable->blk->info.rows - 1;
} else {
pGrp->endIdx = pGrp->beginIdx;
}
pTable->blkRowIdx = pTable->blk->info.rows;
} else {
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
if (timestamp == *(int64_t*)pNextVal) {
pGrp->endIdx++;
continue;
}
if (!pTable->multiEqGrpRows) {
pGrp->endIdx = pGrp->beginIdx;
}
goto _return;
}
}
if (wholeBlk) {
if (wholeBlk && (pTable->multiEqGrpRows || restart)) {
*wholeBlk = true;
if (0 == pGrp->beginIdx) {
if (pTable->noKeepEqGrpRows) {
goto _return;
}
if (0 == pGrp->beginIdx && pTable->multiEqGrpRows) {
pGrp->blk = createOneDataBlock(pTable->blk, true);
} else {
if (!pTable->multiEqGrpRows) {
pGrp->endIdx = pGrp->beginIdx;
}
pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, pGrp->endIdx - pGrp->beginIdx + 1);
pGrp->endIdx -= pGrp->beginIdx;
pGrp->beginIdx = 0;
@ -716,8 +955,12 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
_return:
pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;
if (pTable->noKeepEqGrpRows || (!pTable->multiEqGrpRows && !restart)) {
taosArrayPop(pTable->eqGrps);
} else {
pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;
}
return TSDB_CODE_SUCCESS;
}
@ -835,7 +1078,7 @@ static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDat
taosArrayPush(pNewGrp, &pos);
tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES);
} else {
} else if (pBuild->multiRowsGrp) {
taosArrayPush(*pGrpRows, &pos);
}
@ -1076,9 +1319,22 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
pJoin->joinFp = mInnerJoinDo;
break;
case JOIN_TYPE_LEFT:
case JOIN_TYPE_RIGHT:
pJoin->joinFp = mLeftJoinDo;
case JOIN_TYPE_RIGHT: {
switch (pJoin->subType) {
case JOIN_STYPE_OUTER:
pJoin->joinFp = mLeftJoinDo;
break;
case JOIN_STYPE_SEMI:
pJoin->joinFp = mSemiJoinDo;
break;
case JOIN_STYPE_ANTI:
pJoin->joinFp = mAntiJoinDo;
break;
default:
break;
}
break;
}
case JOIN_TYPE_FULL:
pJoin->joinFp = mFullJoinDo;
break;

File diff suppressed because it is too large Load Diff