enh: support left join

This commit is contained in:
dapan1121 2023-12-12 19:31:12 +08:00
parent bc7e9bf4f2
commit 54ea94af38
4 changed files with 597 additions and 146 deletions

View File

@ -37,14 +37,15 @@ typedef struct SHJoinCtx {
} SHJoinCtx; } SHJoinCtx;
typedef struct SHJoinColInfo { typedef struct SHJoinColInfo {
int32_t srcSlot; int32_t srcSlot;
int32_t dstSlot; int32_t dstSlot;
bool keyCol; bool keyCol;
bool vardata; bool vardata;
int32_t* offset; int32_t* offset;
int32_t bytes; int32_t bytes;
char* data; char* data;
char* bitMap; char* bitMap;
SColumnInfoData* colData;
} SHJoinColInfo; } SHJoinColInfo;
typedef struct SBufPageInfo { typedef struct SBufPageInfo {

View File

@ -20,8 +20,8 @@ extern "C" {
#endif #endif
#define MJOIN_DEFAULT_BLK_ROWS_NUM 4096 #define MJOIN_DEFAULT_BLK_ROWS_NUM 4096
#define MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM (MJOIN_DEFAULT_BLK_ROWS_NUM * 2)
#define MJOIN_HJOIN_CART_THRESHOLD 16 #define MJOIN_HJOIN_CART_THRESHOLD 16
#define MJOIN_BLK_SIZE_LIMIT 10485760
typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*); typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*);
@ -32,12 +32,10 @@ typedef enum EJoinTableType {
#define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE") #define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE")
typedef enum EJoinPhase { typedef struct SMJoinRowPos {
E_JOIN_PHASE_RETRIEVE, SSDataBlock* pBlk;
E_JOIN_PHASE_SPLIT, int32_t pos;
E_JOIN_PHASE_OUTPUT, } SMJoinRowPos;
E_JOIN_PHASE_DONE
} EJoinPhase;
typedef struct SMJoinColMap { typedef struct SMJoinColMap {
int32_t srcSlot; int32_t srcSlot;
@ -53,10 +51,12 @@ typedef struct SMJoinColInfo {
int32_t bytes; int32_t bytes;
char* data; char* data;
char* bitMap; char* bitMap;
SColumnInfoData* colData;
} SMJoinColInfo; } SMJoinColInfo;
typedef struct SMJoinTableInfo { typedef struct SMJoinTableInfo {
EJoinTableType type;
int32_t downStreamIdx; int32_t downStreamIdx;
SOperatorInfo* downStream; SOperatorInfo* downStream;
bool dsInitDone; bool dsInitDone;
@ -70,9 +70,6 @@ typedef struct SMJoinTableInfo {
int32_t finNum; int32_t finNum;
SMJoinColMap* finCols; SMJoinColMap* finCols;
int32_t eqNum;
SMJoinColMap* eqCols;
int32_t keyNum; int32_t keyNum;
SMJoinColInfo* keyCols; SMJoinColInfo* keyCols;
@ -87,25 +84,45 @@ typedef struct SMJoinTableInfo {
SArray* valVarCols; SArray* valVarCols;
bool valColExist; bool valColExist;
int32_t rowIdx; SSDataBlock* blk;
int32_t blkRowIdx;
int64_t grpRowsNum;
int64_t grpRemainRows;
int32_t grpIdx; int32_t grpIdx;
SArray* eqGrps; SArray* eqGrps;
SArray* createdBlks; SArray* createdBlks;
SSDataBlock* blk;
int32_t grpArrayIdx;
SArray* pGrpArrays;
// hash join
int32_t grpRowIdx;
SArray* pHashCurGrp;
SSHashObj* pGrpHash;
} SMJoinTableInfo; } SMJoinTableInfo;
typedef struct SMJoinGrpRows { typedef struct SMJoinGrpRows {
SSDataBlock* blk; SSDataBlock* blk;
int32_t beginIdx; int32_t beginIdx;
int32_t rowsNum; int32_t endIdx;
int32_t readIdx;
bool readMatch;
} SMJoinGrpRows; } SMJoinGrpRows;
typedef struct SMJoinMergeCtx { typedef struct SMJoinMergeCtx {
bool hashCan; bool hashCan;
bool rowRemains; bool keepOrder;
bool grpRemains;
bool midRemains;
bool eqCart; bool eqCart;
int64_t curTs; bool noColCond;
SMJoinGrpRows probeNEqGrps; int32_t blksCapacity;
SSDataBlock* midBlk;
SSDataBlock* finBlk;
SSDataBlock* resBlk;
int64_t lastEqTs;
SMJoinGrpRows probeNEqGrp;
bool hashJoin; bool hashJoin;
} SMJoinMergeCtx; } SMJoinMergeCtx;
@ -163,16 +180,24 @@ typedef struct SMJoinOperatorInfo {
#define MJOIN_DS_NEED_INIT(_pOp, _tbctx) (MJOIN_DS_REQ_INIT(_pOp) && (!(_tbctx)->dsInitDone)) #define MJOIN_DS_NEED_INIT(_pOp, _tbctx) (MJOIN_DS_REQ_INIT(_pOp) && (!(_tbctx)->dsInitDone))
#define MJOIN_TB_LOW_BLK(_tbctx) ((_tbctx)->blkNum <= 0 || ((_tbctx)->blkNum == 1 && (_tbctx)->pHeadBlk->cloned)) #define MJOIN_TB_LOW_BLK(_tbctx) ((_tbctx)->blkNum <= 0 || ((_tbctx)->blkNum == 1 && (_tbctx)->pHeadBlk->cloned))
#define START_NEW_GRP(_ctx) memset(&(_ctx)->currGrpPair, 0, GRP_PAIR_INIT_SIZE) #define REACH_HJOIN_THRESHOLD(_prb, _bld) ((_prb)->grpRowsNum * (_bld)->grpRowsNum > MJOIN_HJOIN_CART_THRESHOLD)
#define REACH_HJOIN_THRESHOLD(_pair) ((_pair)->buildIn.rowNum * (_pair)->probeIn.rowNum > MJOIN_HJOIN_CART_THRESHOLD)
#define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair)) #define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair))
#define LEFT_JOIN_NO_EQUAL(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts)) #define LEFT_JOIN_NO_EQUAL(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts))
#define LEFT_JOIN_DISCRAD(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts)) #define LEFT_JOIN_DISCRAD(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts))
#define GRP_REMAIN_ROWS(_grp) ((_grp)->endIdx - (_grp)->readIdx + 1)
#define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx)
#define BLK_IS_FULL(_blk) ((_blk)->info.rows == (_blk)->info.capacity)
#define SET_TABLE_CUR_TS(_col, _ts, _tb) \
do { \
(_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \
(_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \
} while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -402,11 +402,14 @@ static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator,
} }
static FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { static FORCE_INLINE bool copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) {
char *pData = NULL; char *pData = NULL;
size_t bufLen = 0; size_t bufLen = 0;
if (1 == pTable->keyNum) { if (1 == pTable->keyNum) {
if (colDataIsNull_s(pTable->keyCols[0].colData, rowIdx)) {
return true;
}
if (pTable->keyCols[0].vardata) { if (pTable->keyCols[0].vardata) {
pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx]; pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
bufLen = varDataTLen(pData); bufLen = varDataTLen(pData);
@ -417,6 +420,9 @@ static FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t r
pTable->keyData = pData; pTable->keyData = pData;
} else { } else {
for (int32_t i = 0; i < pTable->keyNum; ++i) { for (int32_t i = 0; i < pTable->keyNum; ++i) {
if (colDataIsNull_s(pTable->keyCols[i].colData, rowIdx)) {
return true;
}
if (pTable->keyCols[i].vardata) { if (pTable->keyCols[i].vardata) {
pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx]; pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
@ -433,6 +439,8 @@ static FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t r
if (pBufLen) { if (pBufLen) {
*pBufLen = bufLen; *pBufLen = bufLen;
} }
return false;
} }
@ -458,7 +466,10 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) {
} }
for (; pCtx->probeIdx < pCtx->pProbeData->info.rows; ++pCtx->probeIdx) { for (; pCtx->probeIdx < pCtx->pProbeData->info.rows; ++pCtx->probeIdx) {
copyKeyColsDataToBuf(pProbe, pCtx->probeIdx, &bufLen); if (copyKeyColsDataToBuf(pProbe, pCtx->probeIdx, &bufLen)) {
continue;
}
SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
/* /*
size_t keySize = 0; size_t keySize = 0;
@ -501,6 +512,7 @@ static int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) {
if (pTable->keyCols[i].vardata) { if (pTable->keyCols[i].vardata) {
pTable->keyCols[i].offset = pCol->varmeta.offset; pTable->keyCols[i].offset = pCol->varmeta.offset;
} }
pTable->keyCols[i].colData = pCol;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -681,7 +693,9 @@ static int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin
size_t bufLen = 0; size_t bufLen = 0;
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
copyKeyColsDataToBuf(pBuild, i, &bufLen); if (copyKeyColsDataToBuf(pBuild, i, &bufLen)) {
continue;
}
code = addRowToHash(pJoin, pBlock, bufLen, i); code = addRowToHash(pJoin, pBlock, bufLen, i);
if (code) { if (code) {
return code; return code;

View File

@ -164,7 +164,7 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi
SMJoinTableInfo* pTable = &pJoin->tbs[idx]; SMJoinTableInfo* pTable = &pJoin->tbs[idx];
pTable->downStream = pDownstream[idx]; pTable->downStream = pDownstream[idx];
pTable->blkId = pDownstream[idx]->resultDataBlockId; pTable->blkId = pDownstream[idx]->resultDataBlockId;
int32_t code = mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->LeftPrimSlotId : pJoinNode->rightPrimSlotId); int32_t code = mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId);
if (code) { if (code) {
return code; return code;
} }
@ -217,6 +217,9 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
pInfo->build->downStreamIdx = buildIdx; pInfo->build->downStreamIdx = buildIdx;
pInfo->probe->downStreamIdx = probeIdx; pInfo->probe->downStreamIdx = probeIdx;
pInfo->build->type = E_JOIN_TB_BUILD;
pInfo->probe->type = E_JOIN_TB_PROBE;
} }
static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) { static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) {
@ -267,10 +270,8 @@ static int32_t mJoinInitBufPages(SMJoinOperatorInfo* pInfo) {
static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
pProbeCtx->type = E_JOIN_TB_PROBE; pCtx->lastEqTs = INT64_MIN;
pBuildCtx->type = E_JOIN_TB_BUILD; pCtx->hashCan = pJoin->probe->keyNum > 0;
pCtx->hashCan = pJoin->probe->eqNum > 0;
pCtx->probeEqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows)); pCtx->probeEqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows));
pCtx->probeCreatedBlks = taosArrayInit(8, POINTER_BYTES); pCtx->probeCreatedBlks = taosArrayInit(8, POINTER_BYTES);
@ -279,17 +280,15 @@ static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* p
pCtx->buildCreatedBlks = taosArrayInit(8, POINTER_BYTES); pCtx->buildCreatedBlks = taosArrayInit(8, POINTER_BYTES);
if (pJoin->pFPreFilter) { if (pJoin->pFPreFilter) {
pCtx->outputCtx.cartCtx.pResBlk = createOneDataBlock(pJoin->pRes); pCtx->midBlk = createOneDataBlock(pJoin->pResBlk, false);
blockDataEnsureCapacity(pCtx->outputCtx.cartCtx.pResBlk, MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM); blockDataEnsureCapacity(pCtx->midBlk, pJoin->pResBlk->info.rows);
pCtx->outputCtx.cartCtx.resThreshold = MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM * 0.75;
} else {
pCtx->outputCtx.cartCtx.pResBlk = pJoin->pRes;
pCtx->outputCtx.cartCtx.resThreshold = pOperator->resultInfo.threshold;
} }
if (!pCtx->outputCtx.hashCan && NULL == pJoin->pFPreFilter) { pCtx->finBlk = pJoin->pResBlk;
pCtx->outputCtx.cartCtx.appendRes = true;
} pCtx->blksCapacity = pJoin->pResBlk->info.rows * 2;
pCtx->resBlk = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -365,43 +364,223 @@ static FORCE_INLINE void mJoinResetTsJoinCtx(SMJoinTsJoinCtx* pCtx) {
} }
static void mLeftJoinCart(SMJoinCartCtx* pCtx) { static void mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst) {
int32_t currRows = pCtx->appendRes ? pCtx->pResBlk->info.rows : 0; SMJoinTableInfo* probe = pJoin->probe;
SMJoinTableInfo* build = pJoin->build;
int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pFirst);
for (int32_t c = 0; c < pCtx->firstColNum; ++c) { for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = pCtx->pFirstCols + c; SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pCtx->pFirstBlk->pBlk, pFirstCol->srcSlot); SColumnInfoData* pInCol = taosArrayGet(pFirst->blk, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pFirstCol->dstSlot); SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
for (int32_t r = 0; r < pCtx->firstRowNum; ++r) { colDataAssignNRows(pOutCol, currRows, pInCol, pFirst->readIdx, firstRows);
if (colDataIsNull_s(pInCol, pCtx->firstRowIdx + r)) {
colDataSetNItemsNull(pOutCol, currRows + r * pCtx->secondRowNum, pCtx->secondRowNum);
} else {
colDataSetNItems(pOutCol, currRows + r * pCtx->secondRowNum, colDataGetData(pInCol, pCtx->firstRowIdx + r), pCtx->secondRowNum, true);
}
}
} }
if (pCtx->firstOnly) { for (int32_t c = 0; c < build->finNum; ++c) {
ASSERT(1 == pCtx->secondRowNum); SMJoinColMap* pSecondCol = build->finCols + c;
for (int32_t c = 0; c < pCtx->secondColNum; ++c) { SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
SMJoinColMap* pSecondCol = pCtx->pSecondCols + c; colDataSetNItemsNull(pOutCol, currRows, firstRows);
SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot);
colDataSetNItemsNull(pOutCol, currRows, pCtx->firstRowNum);
}
} else {
for (int32_t c = 0; c < pCtx->secondColNum; ++c) {
SMJoinColMap* pSecondCol = pCtx->pSecondCols + c;
SColumnInfoData* pInCol = taosArrayGet(pCtx->pSecondBlk->pBlk, pSecondCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < pCtx->firstRowNum; ++r) {
colDataAssignNRows(pOutCol, currRows + r * pCtx->secondRowNum, pInCol, pCtx->secondRowIdx, pCtx->secondRowNum);
}
}
} }
pCtx->pResBlk.info.rows += pCtx->firstRowNum * pCtx->secondRowNum; pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows;
} }
static void mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) {
SMJoinTableInfo* probe = pJoin->probe;
SMJoinTableInfo* build = pJoin->build;
int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pFirst);
int32_t secondRows = GRP_REMAIN_ROWS(pSecond);
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pFirst->blk, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
for (int32_t r = 0; r < firstRows; ++r) {
if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) {
colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows);
} else {
colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->beginIdx + r), secondRows, true);
}
}
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pSecond->blk, pSecondCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < firstRows; ++r) {
colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows);
}
}
pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows;
}
static void mLeftJoinMergeFullCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinTableInfo* probe = pJoin->probe;
SMJoinTableInfo* build = pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
pCtx->eqCart = true;
if (probeRows * build->grpRemainRows <= rowsLeft) {
for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
mLeftJoinGrpEqCart(pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
}
probe->grpIdx++;
build->grpRemainRows = 0;
pCtx->grpRemains = false;
return true;
}
for (; probeGrp->readIdx <= probeGrp->endIdx; ++probeGrp->readIdx) {
for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
int32_t probeEndIdx = probeGrp->endIdx;
probeGrp->endIdx = probeGrp->readIdx;
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
mLeftJoinGrpEqCart(pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
probeGrp->endIdx = probeEndIdx;
continue;
}
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mLeftJoinGrpEqCart(pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
if (rowsLeft <= 0) {
break;
}
}
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
}
static void mLeftJoinCopyMergeMidBlk(SSDataBlock* pMid, SSDataBlock* pFin) {
SSDataBlock* pLess = NULL;
SSDataBlock* pMore = NULL;
if (pMid->info.rows < pFin->info.rows) {
pLess = pMid;
pMore = pFin;
} else {
pLess = pFin;
pMore = pMid;
}
int32_t totalRows = pMid->info.rows + pFin->info.rows;
if (totalRows <= pMore->info.capacity) {
blockDataMerge(pMore, pLess);
} else {
}
}
static void mLeftJoinMergeSeqCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
SMJoinTableInfo* probe = pJoin->probe;
SMJoinTableInfo* build = pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeEndIdx = probeGrp->endIdx;
int32_t rowsLeft = pCtx->midBlk->info.capacity;
bool contLoop = true;
pCtx->eqCart = true;
do {
for (; !GRP_DONE(probeGrp->readIdx) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
probeGrp->endIdx = probeGrp->readIdx;
for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
mLeftJoinGrpEqCart(pJoin, pCtx->midBlk, true, probeGrp, buildGrp);
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
continue;
}
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mLeftJoinGrpEqCart(pJoin, pCtx->midBlk, true, probeGrp, buildGrp);
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
doFilter(pCtx->midBlk, pJoin->pFPreFilter, NULL);
if (pCtx->midBlk->info.rows > 0) {
probeGrp->readMatch = true;
} else if (build->grpIdx == buildGrpNum && !probeGrp->readMatch) {
mLeftJoinGrpNonEqCart(pJoin, pCtx->finBlk, true, probeGrp);
continue;
}
if (pCtx->midBlk->info.rows >= pJoin->pOperator->resultInfo.threshold) {
contLoop = false;
break;
}
rowsLeft = pCtx->midBlk->info.capacity - pCtx->midBlk->info.rows;
mLeftJoinCopyMergeMidBlk(&pCtx->midBlk, &pCtx->finBlk);
break;
}
if (GRP_DONE(probeGrp->readIdx) || BLK_IS_FULL(pCtx->finBlk)) {
break;
}
} while (contLoop);
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
}
static void mLeftJoinMergeCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
if (NULL == pJoin->pFPreFilter) {
mLeftJoinMergeFullCart(pJoin, pCtx);
} else {
mLeftJoinMergeSeqCart(pJoin, pCtx);
}
}
static void mLeftJoinNonEqCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinGrpRows* probeGrp = &pCtx->probeNEqGrp;
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
pCtx->eqCart = false;
if (probeRows <= rowsLeft) {
mLeftJoinGrpNonEqCart(pJoin, pCtx->finBlk, true, probeGrp, NULL);
probeGrp->readIdx = probeGrp->endIdx + 1;
pCtx->grpRemains = false;
} else {
int32_t probeEndIdx = probeGrp->endIdx;
probeGrp->endIdx = probeGrp->readIdx + rowsLeft - 1;
mLeftJoinGrpNonEqCart(pJoin, pCtx->finBlk, true, probeGrp, NULL);
probeGrp->readIdx = probeGrp->endIdx + 1;
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = true;
}
}
static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableInfo* ppTb) { static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableInfo* ppTb) {
if (!(*ppTb)->dsFetchDone && (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows)) { if (!(*ppTb)->dsFetchDone && (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows)) {
(*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, (*ppTb)->downStreamIdx); (*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, (*ppTb)->downStreamIdx);
@ -421,12 +600,12 @@ static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBl
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->rowIdx, &pJoin->probe->blk, &pJoin->probe); bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, &pJoin->probe);
bool buildGot = false; bool buildGot = false;
do { do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->rowIdx, &pJoin->build->blk, &pJoin->build); buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, &pJoin->build);
} }
if (NULL == pJoin->probe->blk) { if (NULL == pJoin->probe->blk) {
@ -435,7 +614,7 @@ static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi
} else if (buildGot && probeGot) { } else if (buildGot && probeGot) {
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
if (*((int64_t*)pProbeCol->pData + pJoin->probe->rowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
continue; continue;
} }
} }
@ -494,16 +673,21 @@ static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable
pGrp = taosArrayReserve(pTable->eqGrps, 1); pGrp = taosArrayReserve(pTable->eqGrps, 1);
} }
pGrp->beginIdx = pTable->rowIdx++; pGrp->beginIdx = pTable->blkRowIdx++;
pGrp->rowsNum = 1; pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = pGrp->beginIdx;
pGrp->readMatch = false;
pGrp->blk = pTable->blk; pGrp->blk = pTable->blk;
for (; pTable->rowIdx < pTable->blk->info.rows; ++pTable->rowIdx) { for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
char* pNextVal = colDataGetData(pCol, pTable->rowIdx); char* pNextVal = colDataGetData(pCol, pTable->blkRowIdx);
if (timestamp == *(int64_t*)pNextVal) { if (timestamp == *(int64_t*)pNextVal) {
pGrp->rowsNum++; pGrp->endIdx++;
continue; continue;
} }
pTable->grpRowsNum += pGrp->endIdx - pGrp->beginIdx + 1;
pTable->grpRemainRows = pTable->grpRowsNum;
return; return;
} }
@ -512,18 +696,19 @@ static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable
if (0 == pGrp->beginIdx) { if (0 == pGrp->beginIdx) {
pGrp->blk = createOneDataBlock(pTable->blk, true); pGrp->blk = createOneDataBlock(pTable->blk, true);
} else { } else {
pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, pGrp->rowsNum - pGrp->beginIdx); pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, pGrp->endIdx - pGrp->beginIdx + 1);
} }
taosArrayPush(pTable->createdBlks, &pGrp->blk); taosArrayPush(pTable->createdBlks, &pGrp->blk);
pGrp->beginIdx = 0; pGrp->beginIdx = 0;
} }
pTable->grpRowsNum += pGrp->endIdx - pGrp->beginIdx + 1;
pTable->grpRemainRows = pTable->grpRowsNum;
} }
static int32_t mJoinRetrieveSameTsRows(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp) { static int32_t mJoinRetrieveSameTsRows(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp) {
SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinOperatorInfo* pJoin = pOperator->info;
int32_t endPos = -1;
SSDataBlock* dataBlock = startDataBlock;
bool allBlk = false; bool allBlk = false;
mJoinBuildEqGroups(pOperator, pTable, timestamp, &allBlk, true); mJoinBuildEqGroups(pOperator, pTable, timestamp, &allBlk, true);
@ -532,7 +717,7 @@ static int32_t mJoinRetrieveSameTsRows(SOperatorInfo* pOperator, SMJoinTableInfo
pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx); pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx);
qDebug("merge join %s table got block for same ts, rows:%" PRId64, MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); qDebug("merge join %s table got block for same ts, rows:%" PRId64, MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
pTable->rowIdx = 0; pTable->blkRowIdx = 0;
if (NULL == pTable->blk) { if (NULL == pTable->blk) {
pTable->dsFetchDone = true; pTable->dsFetchDone = true;
@ -546,96 +731,323 @@ static int32_t mJoinRetrieveSameTsRows(SOperatorInfo* pOperator, SMJoinTableInfo
return 0; return 0;
} }
static int32_t mJoinEqualCart(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes) { static int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableInfo* pTable) {
int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pTable->keyNum; ++i) {
SMJoinOperatorInfo* pJoin = pOperator->info; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
SSHashObj* rightTableHash = NULL; qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata);
bool rightUseBuildTable = false; return TSDB_CODE_INVALID_PARA;
if (!pCtx->rowRemains) {
mJoinBuildEqGroups(pOperator, pJoin->probe, timestamp, NULL, true);
mJoinRetrieveSameTsRows(pOperator, pJoin->build, timestamp);
if (pJoinInfo->pColEqualOnConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) {
mergeJoinFillBuildTable(pJoinInfo, rightRowLocations);
pCtx->hashJoin = true;
taosArrayDestroy(rightRowLocations);
rightRowLocations = NULL;
} }
if (pTable->keyCols[i].bytes != pCol->info.bytes) {
qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->keyCols[i].srcSlot, pCol->info.bytes, pTable->keyCols[i].bytes);
return TSDB_CODE_INVALID_PARA;
}
pTable->keyCols[i].data = pCol->pData;
if (pTable->keyCols[i].vardata) {
pTable->keyCols[i].offset = pCol->varmeta.offset;
}
pTable->keyCols[i].colData = pCol;
} }
bool reachThreshold = false; return TSDB_CODE_SUCCESS;
}
if (code == TSDB_CODE_SUCCESS) {
mLeftJoinCart(pOperator, pRes, nRows, leftRowLocations, leftRowIdx,
rightRowIdx, pCtx->hashJoin, rightRowLocations, &reachThreshold);
}
if (!reachThreshold) {
mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks,
pCtx->hashJoin, rightRowLocations);
static FORCE_INLINE true mJoinCopyKeyColsDataToBuf(SMJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) {
char *pData = NULL;
size_t bufLen = 0;
if (1 == pTable->keyNum) {
if (colDataIsNull_s(pTable->keyCols[0].colData, rowIdx)) {
return true;
}
if (pTable->keyCols[0].vardata) {
pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
bufLen = varDataTLen(pData);
} else {
pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx;
bufLen = pTable->keyCols[0].bytes;
}
pTable->keyData = pData;
} else { } else {
pJoinInfo->rowCtx.rowRemains = true; for (int32_t i = 0; i < pTable->keyNum; ++i) {
pJoinInfo->rowCtx.ts = timestamp; if (colDataIsNull_s(pTable->keyCols[i].colData, rowIdx)) {
pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; return true;
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; }
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; if (pTable->keyCols[i].vardata) {
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
bufLen += varDataTLen(pData);
} else {
pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx;
memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
bufLen += pTable->keyCols[i].bytes;
}
}
pTable->keyData = pTable->keyBuf;
} }
if (pBufLen) {
*pBufLen = bufLen;
}
return false;
}
static int32_t mJoinGetAvailableGrpArray(SMJoinTableInfo* pTable, SArray** ppRes) {
do {
if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) {
*ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++);
return TSDB_CODE_SUCCESS;
}
SArray* pNew = taosArrayInit(4, sizeof(SMJoinRowPos));
if (NULL == pNew) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pTable->pGrpArrays, &pNew);
} while (true);
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) {
SMJoinTableInfo* pBuild = pJoin->build;
SMJoinRowPos pos = {pBlock, rowIdx};
SArray** pGrpRows = tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen);
if (!pGrpRows) {
SArray* pNewGrp = NULL;
int32_t code = mJoinGetAvailableGrpArray(pBuild, &pNewGrp);
if (code) {
return code;
}
taosArrayPush(pNewGrp, &pos);
tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES);
} else {
taosArrayPush(*pGrpRows, &pos);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableInfo* pTable) {
int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
for (int32_t g = 0; g < grpNum; ++g) {
SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g);
int32_t code = mJoinSetKeyColsData(pGrp->blk, pTable);
if (code) {
return code;
}
int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
size_t bufLen = 0;
for (int32_t r = 0; r < grpRows; ++r) {
if (mJoinCopyKeyColsDataToBuf(pTable, r, &bufLen)) {
continue;
}
code = mJoinAddRowToHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r);
if (code) {
return code;
}
}
}
}
static bool mLeftJoinHashRowCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp, SMJoinTableInfo* probe, SMJoinTableInfo* build) {
int32_t rowsLeft = pCtx->resBlk->info.capacity - pCtx->resBlk->info.rows;
int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp);
int32_t grpRows = buildGrpRows - build->grpRowIdx;
int32_t actRows = TMIN(grpRows, rowsLeft);
int32_t currRows = pCtx->noColCond ? pCtx->resBlk->info.rows : 0;
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pCtx->resBlk->pDataBlock, pFirstCol->dstSlot);
if (colDataIsNull_s(pInCol, probeGrp->readIdx)) {
colDataSetNItemsNull(pOutCol, currRows, actRows);
} else {
colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->beginIdx), actRows, true);
}
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pOutCol = taosArrayGet(pCtx->resBlk->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < actRows; ++r) {
SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, r);
SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk, pSecondCol->srcSlot);
colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1);
}
}
pCtx->resBlk->info.rows += actRows;
if (actRows == grpRows) {
build->grpRowIdx = -1;
} else {
build->grpRowIdx += actRows;
}
if (actRows == rowsLeft) {
return false;
}
return true;
}
static void mLeftJoinHashCart(SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
SMJoinTableInfo* probe = pJoin->probe;
SMJoinTableInfo* build = pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (pJoin->build->grpRowIdx >= 0) {
bool contLoop = mLeftJoinHashRowCart(pCtx, probeGrp, probe, build);
if (build->grpRowIdx < 0) {
probeGrp->readIdx++;
}
if (!contLoop) {
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return;
}
}
pCtx->eqCart = true;
size_t bufLen = 0;
for (; probeGrp->readIdx < probeGrp->endIdx; ++probeGrp->readIdx) {
if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
continue;
}
SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
if (NULL == pGrp) {
} else {
build->pHashCurGrp = *pGrp;
build->grpRowIdx = 0;
if (NULL == pJoin->pPreFilter) {
if (!mLeftJoinHashRowCart(pCtx, probeGrp, probe, build)) {
break;
}
} else {
}
}
}
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
}
static int32_t mLeftJoinProcessEqualGrp(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
mJoinBuildEqGroups(pOperator, pJoin->probe, timestamp, NULL, true);
mJoinRetrieveSameTsRows(pOperator, pJoin->build, timestamp);
if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) {
int32_t code = mJoinMakeBuildTbHash(pJoin, pJoin->build);
if (code) {
return code;
}
code = mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe);
if (code) {
return code;
}
pCtx->hashJoin = true;
mLeftJoinHashCart(pJoin, pCtx);
} else {
mLeftJoinMergeCart(pJoin, pCtx);
}
return TSDB_CODE_SUCCESS;
}
static bool mLeftJoinHandleRowRemains(struct SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx, SSDataBlock* pRes) {
if (pCtx->eqCart) {
if (pCtx->hashJoin) {
mLeftJoinHashCart(pJoin, pCtx);
} else {
mLeftJoinMergeCart(pJoin, pCtx);
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return false;
}
return true;
}
mLeftJoinNonEqCart(pJoin, pCtx);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return false;
}
return true;
}
static void mLeftJoinDo(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { static void mLeftJoinDo(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
int64_t probeTs = INT64_MIN; int64_t probeTs = 0;
int64_t buildTs = 0; int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL; SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL; SColumnInfoData* pProbeCol = NULL;
bool asc = (pJoin->inputTsOrder == TSDB_ORDER_ASC) ? true : false;
bool asc = (pJoin->inputOrder == TSDB_ORDER_ASC) ? true : false; if (pCtx->grpRemains && !mLeftJoinHandleRowRemains(pOperator, pJoin, pCtx, pRes)) {
return;
}
do { do {
if (pCtx->rowRemains) { if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) {
probeTs = buildTs = pCtx->curTs; break;
} else {
if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx, probeTs)) {
break;
}
pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
probeTs = *((int64_t*)pProbeCol->pData + pJoin->probe->rowIdx);
buildTs = *((int64_t*)pBuildCol->pData + pCtx->buildIdx);
} }
while (pCtx->probeIdx < pJoin->probe->blk->info.rows && pCtx->buildIdx < pJoin->build->blk->info.rows) { SET_TABLE_CUR_TS(pBuildCol, buildTs, pJoin->build);
SET_TABLE_CUR_TS(pProbeCol, probeTs, pJoin->probe);
if (probeTs == pCtx->lastEqTs) {
mLeftJoinProcessEqualGrp(pOperator, probeTs, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return;
}
if (pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
SET_TABLE_CUR_TS(pProbeCol, probeTs, pJoin->probe);
} else {
continue;
}
}
while (pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows && pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
if (probeTs == buildTs) { if (probeTs == buildTs) {
mJoinEqualCart(pOperator, probeTs, pRes); pCtx->lastEqTs = probeTs;
mLeftJoinProcessEqualGrp(pOperator, probeTs, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) { if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return; return;
} }
break;
SET_TABLE_CUR_TS(pBuildCol, buildTs, pJoin->build);
SET_TABLE_CUR_TS(pProbeCol, probeTs, pJoin->probe);
} else if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) { } else if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) {
pCtx->probeNEqGrps.beginIdx = pCtx->probeIdx; pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
do { do {
pCtx->probeNEqGrps.rowsNum++; pCtx->probeNEqGrp.endIdx = pJoin->probe->blkRowIdx;
probeTs = *((int64_t*)pProbeCol->pData + (++pCtx->probeIdx)); probeTs = *((int64_t*)pProbeCol->pData + (++pJoin->probe->blkRowIdx));
} while (pCtx->probeIdx < pJoin->probe->blk->info.rows && LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)); } while (pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows && LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs));
mJoinNonEqualCart(pOperator, &pCtx->probeNEqGrps, pRes); mLeftJoinNonEqCart(pJoin, pCtx);
if (pRes->info.rows >= pOperator->resultInfo.threshold) { if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return; return;
} }
} else { } else {
buildTs = *((int64_t*)pBuildCol->pData + (++pCtx->buildIdx)); buildTs = *((int64_t*)pBuildCol->pData + (++pJoin->build->blkRowIdx));
while (pCtx->buildIdx < pJoin->build->blk->info.rows && LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) { while (pJoin->build->blkRowIdx < pJoin->build->blk->info.rows && LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) {
buildTs = *((int64_t*)pBuildCol->pData + (++pCtx->buildIdx)); buildTs = *((int64_t*)pBuildCol->pData + (++pJoin->build->blkRowIdx));
} }
} }
} }
@ -711,9 +1123,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
} }
int32_t numOfCols = 0; int32_t numOfCols = 0;
pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); pInfo->pResBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, MJOIN_DEFAULT_BLK_ROWS_NUM); blockDataEnsureCapacity(pInfo->pResBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc.totalRowSize));
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);