enh: left join refact

This commit is contained in:
dapan1121 2023-12-08 19:25:05 +08:00
parent 7fe919ad16
commit bc7e9bf4f2
2 changed files with 263 additions and 590 deletions

View File

@ -30,6 +30,8 @@ typedef enum EJoinTableType {
E_JOIN_TB_PROBE
} EJoinTableType;
#define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE")
typedef enum EJoinPhase {
E_JOIN_PHASE_RETRIEVE,
E_JOIN_PHASE_SPLIT,
@ -53,41 +55,13 @@ typedef struct SMJoinColInfo {
char* bitMap;
} SMJoinColInfo;
typedef struct SMJoinCartCtx {
bool appendRes;
bool firstOnly;
int32_t resThreshold;
SSDataBlock* pResBlk;
int32_t firstColNum;
SMJoinColMap* pFirstCols;
int32_t secondColNum;
SMJoinColMap* pSecondCols;
SMJoinBlkInfo* pFirstBlk;
SMJoinBlkInfo* pSecondBlk;
int32_t firstRowIdx;
int32_t firstRowNum;
int32_t secondRowIdx;
int32_t secondRowNum;
} SMJoinCartCtx;
typedef struct SMJoinBlkInfo {
bool cloned;
bool inUse;
SSDataBlock* pBlk;
void* pNext;
} SMJoinBlkInfo;
typedef struct SMJoinRowInfo {
int64_t blkId;
int32_t rowIdx;
int64_t rowGIdx;
} SMJoinRowInfo;
typedef struct SMJoinTableInfo {
int32_t downStreamIdx;
SOperatorInfo* downStream;
bool dsInitDone;
bool dsFetchDone;
int32_t blkId;
SQueryStat inputStat;
@ -112,97 +86,27 @@ typedef struct SMJoinTableInfo {
int32_t valBufSize;
SArray* valVarCols;
bool valColExist;
int32_t rowIdx;
int32_t grpIdx;
SArray* eqGrps;
SArray* createdBlks;
SSDataBlock* blk;
} SMJoinTableInfo;
typedef struct SMJoinTsJoinCtx {
SMJoinTableCtx* pProbeCtx;
SMJoinTableCtx* pBuildCtx;
int64_t probeRowNum;
int64_t buildRowNum;
int64_t* probeTs;
int64_t* buildTs;
int64_t* probeEndTs;
int64_t* buildEndTs;
bool inSameTsGrp;
bool inDiffTsGrp;
bool nextProbeRow;
SGrpPairRes* pLastGrpPair;
SGrpPairRes currGrpPair;
} SMJoinTsJoinCtx;
typedef struct SBuildGrpResIn {
bool multiBlk;
SMJoinBlkInfo* pBeginBlk;
int32_t rowBeginIdx;
int32_t rowNum;
} SBuildGrpResIn;
typedef struct SBuildGrpResOut {
SSHashObj* pHash;
SMJoinBlkInfo* pCurrBlk;
int32_t rowReadIdx;
int32_t rowGReadNum;
} SBuildGrpResOut;
typedef struct SProbeGrpResIn {
bool allRowsGrp;
SMJoinBlkInfo* pBeginBlk;
int32_t rowBeginIdx;
int32_t rowNum;
int64_t grpLastTs;
} SProbeGrpResIn;
typedef struct SProbeGrpResOut {
SMJoinBlkInfo* pCurrBlk;
int32_t rowReadIdx;
int32_t rowGReadNum;
} SProbeGrpResOut;
typedef struct SGrpPairRes {
bool sameTsGrp;
bool finishGrp;
bool hashJoin;
SProbeGrpResIn prbIn;
SBuildGrpResIn bldIn;
/* KEEP THIS PART AT THE END */
bool outBegin;
SBuildGrpResOut bldOut;
SProbeGrpResOut prbOut;
/* KEEP THIS PART AT THE END */
} SGrpPairRes;
#define GRP_PAIR_INIT_SIZE (sizeof(SGrpPairRes) - sizeof(bool) - sizeof(SBuildGrpResOut) - sizeof(SProbeGrpResOut))
typedef struct SMJoinOutputCtx {
bool hashCan;
int32_t grpReadIdx;
int64_t grpCurTs;
SMJoinCartCtx cartCtx;
SArray* pGrpResList;
} SMJoinOutputCtx;
typedef struct SMJoinTableCtx {
EJoinTableType type;
void* blkFetchedFp;
SMJoinTableInfo* pTbInfo;
bool dsInitDone;
bool dsFetchDone;
int64_t blkCurTs;
int32_t blkRowIdx;
int64_t blkIdx;
int64_t blkNum;
SMJoinBlkInfo* pCurrBlk;
SMJoinBlkInfo* pHeadBlk;
SMJoinBlkInfo* pTailBlk;
} SMJoinTableCtx;
typedef struct SMJoinGrpRows {
SSDataBlock* blk;
int32_t beginIdx;
int32_t rowsNum;
} SMJoinGrpRows;
typedef struct SMJoinMergeCtx {
EJoinPhase joinPhase;
SMJoinOutputCtx outputCtx;
SMJoinTsJoinCtx tsJoinCtx;
SMJoinTableCtx buildTbCtx;
SMJoinTableCtx probeTbCtx;
bool hashCan;
bool rowRemains;
bool eqCart;
int64_t curTs;
SMJoinGrpRows probeNEqGrps;
bool hashJoin;
} SMJoinMergeCtx;
typedef struct SMJoinWinCtx {
@ -242,9 +146,9 @@ typedef struct SMJoinOperatorInfo {
int32_t subType;
int32_t inputTsOrder;
SMJoinTableInfo tbs[2];
SMJoinTableInfo* pBuild;
SMJoinTableInfo* pProbe;
SSDataBlock* pRes;
SMJoinTableInfo* build;
SMJoinTableInfo* probe;
SSDataBlock* pResBlk;
int32_t pResColNum;
int8_t* pResColMap;
SFilterInfo* pFPreFilter;
@ -265,35 +169,9 @@ typedef struct SMJoinOperatorInfo {
#define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair))
#define BUILD_TB_BROKEN_BLK(_sg, _out, _in) ((_sg) && (((_out)->pCurrBlk == (_in)->pBeginBlk && (_out)->rowReadIdx != (_in)->rowBeginIdx) || ((_out)->pCurrBlk != (_in)->pBeginBlk && (_out)->rowReadIdx != 0)))
#define LEFT_JOIN_NO_EQUAL(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts))
#define LEFT_JOIN_DISCRAD(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts))
#define FIN_SAME_TS_GRP(_ctx, _octx, _done) do { \
if ((_ctx)->inSameTsGrp) { \
(_ctx)->currGrpPair.sameTsGrp = true; \
(_ctx)->currGrpPair.finishGrp = (_done); \
SET_SAME_TS_GRP_HJOIN(&(_ctx)->currGrpPair, _octx); \
(_ctx)->inSameTsGrp = false; \
(_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \
} \
} while (0)
#define FIN_DIFF_TS_GRP(_ctx, _octx, _done) do { \
if ((_ctx)->inDiffTsGrp) { \
(_ctx)->currGrpPair.sameTsGrp = false; \
(_ctx)->currGrpPair.finishGrp = true; \
(_ctx)->currGrpPair.probeIn.allRowsGrp= (_done); \
(_ctx)->inDiffTsGrp = false; \
(_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \
} else if (_done) { \
(_ctx)->currGrpPair.sameTsGrp = false; \
(_ctx)->currGrpPair.finishGrp = true; \
(_ctx)->currGrpPair.probeIn.grpRowBeginIdx = (_ctx)->pProbeCtx->blkRowIdx; \
(_ctx)->currGrpPair.probeIn.allRowsGrp = true; \
} \
} while (0)
#define PRB_CUR_BLK_GRP_ROWS(_rn, _rb, _bn) (((_rn) + (_rb)) <= (_bn) ? (_rn) : ((_bn) - (_rb)))
#define BLD_CUR_BLK_GRP_ROWS(_sg, _rn, _rb, _bn) ((_sg) ? 1 : (((_rn) + (_rb)) <= (_bn) ? (_rn) : ((_bn) - (_rb))))
#ifdef __cplusplus

View File

@ -212,11 +212,11 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
break;
}
pInfo->pBuild = &pInfo->tbs[buildIdx];
pInfo->pProbe = &pInfo->tbs[probeIdx];
pInfo->build = &pInfo->tbs[buildIdx];
pInfo->probe = &pInfo->tbs[probeIdx];
pInfo->pBuild->downStreamIdx = buildIdx;
pInfo->pProbe->downStreamIdx = probeIdx;
pInfo->build->downStreamIdx = buildIdx;
pInfo->probe->downStreamIdx = probeIdx;
}
static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) {
@ -231,7 +231,7 @@ static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhys
FOREACH(pNode, pJoinNode->pTargets) {
STargetNode* pTarget = (STargetNode*)pNode;
SColumnNode* pCol = (SColumnNode*)pTarget->pExpr;
if (pCol->dataBlockId == pInfo->pBuild->blkId) {
if (pCol->dataBlockId == pInfo->build->blkId) {
pInfo->pResColMap[i] = 1;
}
@ -264,43 +264,19 @@ static int32_t mJoinInitBufPages(SMJoinOperatorInfo* pInfo) {
return mJoinAddPageToBufList(pInfo->pRowBufs);
}
static int32_t mJoinDoRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTbCtx) {
bool retrieveCont = false;
int32_t code = TSDB_CODE_SUCCESS;
do {
SSDataBlock* pBlock = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTbCtx->pTbInfo->downStreamIdx);
pTbCtx->dsInitDone = true;
if (NULL == pBlock) {
retrieveCont = false;
code = (*pJoin->joinFps.handleTbFetchDoneFp)(pJoin, pTbCtx);
} else {
code = (*pTbCtx->blkFetchedFp)(pJoin, pTbCtx, pBlock, &retrieveCont);
}
} while (retrieveCont || TSDB_CODE_SUCCESS != code);
return code;
}
static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx;
pProbeCtx->type = E_JOIN_TB_PROBE;
pBuildCtx->type = E_JOIN_TB_BUILD;
pCtx->tsJoinCtx.pProbeCtx = pProbeCtx;
pCtx->tsJoinCtx.pBuildCtx = pBuildCtx;
pCtx->hashCan = pJoin->probe->eqNum > 0;
pCtx->joinPhase = E_JOIN_PHASE_RETRIEVE;
pCtx->probeEqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows));
pCtx->probeCreatedBlks = taosArrayInit(8, POINTER_BYTES);
pCtx->outputCtx.hashCan = pProbeCtx->pTbInfo->eqNum > 0;
pCtx->outputCtx.pGrpResList = taosArrayInit(MJOIN_DEFAULT_BLK_ROWS_NUM, sizeof(SGrpPairRes));
if (NULL == pCtx->outputCtx.pGrpResList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCtx->buildEqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows));
pCtx->buildCreatedBlks = taosArrayInit(8, POINTER_BYTES);
if (pJoin->pFPreFilter) {
pCtx->outputCtx.cartCtx.pResBlk = createOneDataBlock(pJoin->pRes);
@ -314,11 +290,6 @@ static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* p
if (!pCtx->outputCtx.hashCan && NULL == pJoin->pFPreFilter) {
pCtx->outputCtx.cartCtx.appendRes = true;
}
pCtx->outputCtx.cartCtx.firstColNum = pProbeCtx->pTbInfo->finNum;
pCtx->outputCtx.cartCtx.pFirstCols = pProbeCtx->pTbInfo->finCols;
pCtx->outputCtx.cartCtx.secondColNum = pBuildCtx->pTbInfo->finNum;
pCtx->outputCtx.cartCtx.pSecondCols = pBuildCtx->pTbInfo->finCols;
return TSDB_CODE_SUCCESS;
}
@ -375,84 +346,6 @@ static int32_t mJoinAddBlkToList(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pCtx
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinProbeFetchDone(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pProbeCtx) {
pProbeCtx->dsFetchDone = true;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinBuildFetchDone(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx) {
pBuildCtx->dsFetchDone = true;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinProbeBlkFetched(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pProbeCtx, SSDataBlock* pBlock, bool* retrieveCont) {
int32_t code = mJoinAddBlkToList(pJoin, pProbeCtx, pBlock);
if (code) {
return code;
}
*retrieveCont = false;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinBuildBlkFetched(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx, SSDataBlock* pBlock, bool* retrieveCont) {
SMJoinTableCtx* pProbeCtx = &pJoin->ctx.mergeCtx.probeTbCtx;
if (pProbeCtx->blkNum <= 0) {
*retrieveCont = false;
return TSDB_CODE_SUCCESS;
}
SColumnInfoData* tsCol = taosArrayGet(pBlock, pBuildCtx->pTbInfo->primCol->srcSlot);
int64_t lastTs = *((int64_t*)tsCol->pData + pBlock->info.rows - 1);
if (pProbeCtx->blkCurTs > lastTs) {
*retrieveCont = true;
} else {
int32_t code = mJoinAddBlkToList(pJoin, pBuildCtx, pBlock);
if (code) {
return code;
}
if (pProbeCtx->blkCurTs == lastTs && lastTs == *(int64_t*)tsCol->pData) {
*retrieveCont = true;
} else {
*retrieveCont = false;
}
}
return TSDB_CODE_SUCCESS;
}
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx;
if ((!pProbeCtx->dsFetchDone) && MJOIN_TB_LOW_BLK(pProbeCtx)) {
int32_t code = mJoinDoRetrieve(pOperator, pProbeCtx, pJoin->pProbe);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
}
if ((pProbeCtx->blkNum > 0 || MJOIN_DS_NEED_INIT(pOperator, pBuildCtx)) && (!pBuildCtx->dsFetchDone) && MJOIN_TB_LOW_BLK(pBuildCtx)) {
int32_t code = mJoinDoRetrieve(pJoin, pBuildCtx, pJoin->pBuild);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
}
if (pProbeCtx->pHeadBlk) {
pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_SPLIT;
return true;
}
mJoinSetDone(pOperator);
return false;
}
static FORCE_INLINE void mLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) {
pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows;
pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows;
@ -464,176 +357,6 @@ static FORCE_INLINE void mLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJoinT
pCtx->buildEndTs = (int64_t*)buildCol->pData + pCtx->buildRowNum - 1;
}
static bool mJoinProbeMoveToNextBlk(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx) {
if (NULL == pProbeCtx->pCurrBlk->pNext) {
pProbeCtx->blkIdx++;
return false;
}
pProbeCtx->pCurrBlk = pProbeCtx->pCurrBlk->pNext;
pProbeCtx->blkIdx++;
pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows;
SColumnInfoData* probeCol = taosArrayGet(pProbeCtx->pCurrBlk->pBlk, pProbeCtx->pTbInfo->primCol->srcSlot);
pCtx->probeTs = (int64_t*)probeCol->pData;
pCtx->probeEndTs = (int64_t*)probeCol->pData + pCtx->probeRowNum - 1;
pProbeCtx->blkRowIdx = 0;
return true;
}
static void mJoinLeftJoinAddBlkToGrp(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) {
FIN_SAME_TS_GRP();
int32_t rowNum = pProbeCtx->pCurrBlk->pBlk->info.rows - pProbeCtx->blkRowIdx;
if (pCtx->nextProbeRow && pCtx->inDiffTsGrp) {
pCtx->currGrpPair->prbIn.rowNum += rowNum;
} else {
pCtx->inDiffTsGrp = true;
START_NEW_GRP(pCtx);
pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair->prbIn.rowNum = rowNum;
}
pCtx->nextProbeRow = true;
}
static bool mJoinBuildMoveToNextBlk(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pBuildCtx, SMJoinTableCtx* pProbeCtx) {
bool contLoop = false;
bool res = true;
pCtx->nextProbeRow = false;
do {
if (NULL == pBuildCtx->pCurrBlk->pNext) {
pBuildCtx->blkIdx++;
return false;
}
pBuildCtx->pCurrBlk = pBuildCtx->pCurrBlk->pNext;
pBuildCtx->blkIdx++;
pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows;
SColumnInfoData* buildCol = taosArrayGet(pBuildCtx->pCurrBlk->pBlk, pBuildCtx->pTbInfo->primCol->srcSlot);
pCtx->buildTs = (int64_t*)buildCol->pData;
pCtx->buildEndTs = (int64_t*)buildCol->pData + pCtx->buildRowNum - 1;
pBuildCtx->blkRowIdx = 0;
do {
if (*pCtx->buildTs > pCtx->probeTs[pProbeCtx->blkRowIdx]) {
mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx);
contLoop = mJoinProbeMoveToNextBlk(pCtx, pProbeCtx);
} else if (pCtx->probeTs[pProbeCtx->blkRowIdx] > *pCtx->buildEndTs) {
contLoop = true;
pBuildCtx->pCurrBlk->inUse = false;
break;
} else {
contLoop = false;
res = true;
}
} while (contLoop);
} while (contLoop);
return res;
}
static bool mLeftJoinSplitGrpImpl(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinTsJoinCtx* pCtx = &pJoin->ctx.mergeCtx.tsJoinCtx;
SMJoinOutputCtx* pOutCtx = &pJoin->ctx.mergeCtx.outputCtx;
SMJoinTableCtx* pProbeCtx = &pJoin->ctx.mergeCtx.probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pJoin->ctx.mergeCtx.buildTbCtx;
mLefeJoinUpdateTsJoinCtx(pCtx, pProbeCtx, pBuildCtx);
for (; pProbeCtx->blkIdx < pProbeCtx->blkNum; mJoinProbeMoveToNextBlk(pCtx, pProbeCtx)) {
if (*pCtx->buildTs > *pCtx->probeEndTs) {
mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx);
continue;
} else if (*pCtx->probeTs > *pCtx->buildEndTs) {
if (!mJoinBuildMoveToNextBlk(pJoin, pCtx, pBuildCtx, pProbeCtx)) {
break;
//retrieve build
}
}
for (; pProbeCtx->blkRowIdx < pCtx->probeRowNum; ++pProbeCtx->blkRowIdx) {
if (pCtx->pLastGrpPair && pCtx->pLastGrpPair->sameTsGrp
&& pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->pLastGrpPair->prbIn.grpLastTs) {
pCtx->pLastGrpPair->prbIn.rowNum++;
SET_SAME_TS_GRP_HJOIN(pCtx->pLastGrpPair, pOutCtx);
continue;
}
for (; pBuildCtx->blkIdx < pBuildCtx->blkNum; mJoinBuildMoveToNextBlk(pJoin, pCtx, pBuildCtx, pProbeCtx)) {
for (; pBuildCtx->blkRowIdx < pCtx->buildRowNum; ++pBuildCtx->blkRowIdx) {
if (pCtx->probeTs[pProbeCtx->blkRowIdx] > pCtx->buildTs[pBuildCtx->blkRowIdx]) {
FIN_SAME_TS_GRP(pCtx, pOutCtx, true);
FIN_DIFF_TS_GRP(pCtx, pOutCtx, false);
pCtx->nextProbeRow = false;
continue;
} else if (pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->buildTs[pBuildCtx->blkRowIdx]) {
FIN_DIFF_TS_GRP(pCtx, pOutCtx, false);
if (pCtx->inSameTsGrp) {
pCtx->currGrpPair.bldIn.rowNum++;
} else {
pCtx->inSameTsGrp = true;
START_NEW_GRP(pCtx);
pCtx->currGrpPair.bldIn.pBeginBlk = pBuildCtx->pCurrBlk;
pCtx->currGrpPair.bldIn.rowBeginIdx = pBuildCtx->blkRowIdx;
pCtx->currGrpPair.bldIn.rowNum = 1;
pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.prbIn.grpLastTs = pCtx->probeTs[pProbeCtx->blkRowIdx];
pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair.prbIn.rowNum = 1;
}
pCtx->nextProbeRow = false;
} else {
FIN_SAME_TS_GRP(pCtx, pOutCtx, true);
if (pCtx->inDiffTsGrp) {
pCtx->currGrpPair.prbIn.rowNum++;
} else {
pCtx->inDiffTsGrp = true;
START_NEW_GRP(pCtx);
pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair.prbIn.rowNum = 1;
}
pCtx->nextProbeRow = true;
break;
}
}
// end of single build table
if (pCtx->nextProbeRow) {
break;
}
}
// end of all build tables
if (pCtx->nextProbeRow) {
continue;
}
if (pCtx->inSameTsGrp) {
FIN_SAME_TS_GRP(pCtx, pOutCtx, pBuildCtx->dsFetchDone);
}
break;
}
// end of single probe table
if (pCtx->nextProbeRow) {
continue;
}
break;
}
// end of all probe tables
FIN_DIFF_TS_GRP(pCtx, pOutCtx, pBuildCtx->dsFetchDone);
return true;
}
static FORCE_INLINE void mJoinResetTsJoinCtx(SMJoinTsJoinCtx* pCtx) {
pCtx->inSameTsGrp = false;
pCtx->inDiffTsGrp = false;
@ -641,25 +364,6 @@ static FORCE_INLINE void mJoinResetTsJoinCtx(SMJoinTsJoinCtx* pCtx) {
pCtx->pLastGrpPair = NULL;
}
static bool mLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinTsJoinCtx* pCtx = &pJoin->ctx.mergeCtx.tsJoinCtx;
SMJoinTableCtx* pBuildCtx = &pJoin->ctx.mergeCtx.buildTbCtx;
mJoinResetTsJoinCtx(pCtx);
if (0 == pJoin->ctx.mergeCtx.buildTbCtx.blkNum) {
ASSERTS(pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone, "left join empty build table while fetch not done");
FIN_SAME_TS_GRP(pCtx, &pJoin->ctx.mergeCtx.outputCtx, true);
FIN_DIFF_TS_GRP(pCtx, &pJoin->ctx.mergeCtx.outputCtx, pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone);
} else {
mLeftJoinSplitGrpImpl(pOperator, pJoin);
}
pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_OUTPUT;
return true;
}
static void mLeftJoinCart(SMJoinCartCtx* pCtx) {
int32_t currRows = pCtx->appendRes ? pCtx->pResBlk->info.rows : 0;
@ -698,156 +402,247 @@ static void mLeftJoinCart(SMJoinCartCtx* pCtx) {
pCtx->pResBlk.info.rows += pCtx->firstRowNum * pCtx->secondRowNum;
}
static bool mLeftJoinSameTsOutput(SMJoinOperatorInfo* pJoin, SGrpPairRes* pPair) {
SProbeGrpResIn* pProbeIn = &pPair->prbIn;
SBuildGrpResIn* pBuildIn = &pPair->prbIn;
if (!pPair->outBegin) {
static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableInfo* ppTb) {
if (!(*ppTb)->dsFetchDone && (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows)) {
(*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, (*ppTb)->downStreamIdx);
(*ppTb)->dsInitDone = true;
qDebug("merge join %s table got %" PRId64 " rows block", MJOIN_TBTYPE(ppTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0);
*pIdx = 0;
if (NULL == (*ppBlk)) {
(*ppTb)->dsFetchDone = true;
}
return ((*ppBlk) == NULL) ? false : true;
}
SMJoinBlkInfo* pBInfo = pProbeIn->pBeginBlk;
return true;
}
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->rowIdx, &pJoin->probe->blk, &pJoin->probe);
bool buildGot = false;
do {
if (pJoin->prevFilter) {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->rowIdx, &pJoin->build->blk, &pJoin->build);
}
if (NULL == pJoin->probe->blk) {
mJoinSetDone(pOperator);
return false;
} else if (buildGot && probeGot) {
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
if (*((int64_t*)pProbeCol->pData + pJoin->probe->rowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
continue;
}
}
break;
} while (true);
return true;
}
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp,
int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) {
int32_t numRows = pBlock->info.rows;
ASSERT(startPos < numRows);
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
int32_t i = startPos;
for (; i < numRows; ++i) {
char* pNextVal = colDataGetData(pCol, i);
if (timestamp != *(int64_t*)pNextVal) {
break;
}
}
int32_t endPos = i;
*pEndPos = endPos;
if (endPos - startPos == 0) {
return 0;
}
SSDataBlock* block = pBlock;
bool createdNewBlock = false;
if (endPos == numRows) {
block = blockDataExtractBlock(pBlock, startPos, endPos - startPos);
taosArrayPush(createdBlocks, &block);
createdNewBlock = true;
}
SRowLocation location = {0};
for (int32_t j = startPos; j < endPos; ++j) {
location.pDataBlock = block;
location.pos = (createdNewBlock ? j - startPos : j);
taosArrayPush(rowLocations, &location);
}
return 0;
}
static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp, bool* allBlk, bool restart) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
SMJoinGrpRows* pGrp = NULL;
int32_t
if (restart) {
pGrp = taosArrayGet(pTable->eqGrps, 0);
} else {
pGrp = taosArrayReserve(pTable->eqGrps, 1);
}
pGrp->beginIdx = pTable->rowIdx++;
pGrp->rowsNum = 1;
pGrp->blk = pTable->blk;
for (; pTable->rowIdx < pTable->blk->info.rows; ++pTable->rowIdx) {
char* pNextVal = colDataGetData(pCol, pTable->rowIdx);
if (timestamp == *(int64_t*)pNextVal) {
pGrp->rowsNum++;
continue;
}
return;
}
if (allBlk) {
*allBlk = true;
if (0 == pGrp->beginIdx) {
pGrp->blk = createOneDataBlock(pTable->blk, true);
} else {
if (pPair->bldOut.hashJoin) {
pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, pGrp->rowsNum - pGrp->beginIdx);
}
taosArrayPush(pTable->createdBlks, &pGrp->blk);
pGrp->beginIdx = 0;
}
}
static int32_t mJoinRetrieveSameTsRows(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp) {
SMJoinOperatorInfo* pJoin = pOperator->info;
int32_t endPos = -1;
SSDataBlock* dataBlock = startDataBlock;
bool allBlk = false;
mJoinBuildEqGroups(pOperator, pTable, timestamp, &allBlk, true);
while (allBlk) {
pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx);
qDebug("merge join %s table got block for same ts, rows:%" PRId64, MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
pTable->rowIdx = 0;
if (NULL == pTable->blk) {
pTable->dsFetchDone = true;
break;
}
allBlk = false;
mJoinBuildEqGroups(pOperator, pTable, timestamp, &allBlk, false);
}
return 0;
}
static int32_t mJoinEqualCart(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
SSHashObj* rightTableHash = NULL;
bool rightUseBuildTable = false;
if (!pCtx->rowRemains) {
mJoinBuildEqGroups(pOperator, pJoin->probe, timestamp, NULL, true);
mJoinRetrieveSameTsRows(pOperator, pJoin->build, timestamp);
if (pJoinInfo->pColEqualOnConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) {
mergeJoinFillBuildTable(pJoinInfo, rightRowLocations);
pCtx->hashJoin = true;
taosArrayDestroy(rightRowLocations);
rightRowLocations = NULL;
}
}
bool reachThreshold = false;
if (code == TSDB_CODE_SUCCESS) {
mLeftJoinCart(pOperator, pRes, nRows, leftRowLocations, leftRowIdx,
rightRowIdx, pCtx->hashJoin, rightRowLocations, &reachThreshold);
}
if (!reachThreshold) {
mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks,
pCtx->hashJoin, rightRowLocations);
} else {
pJoinInfo->rowCtx.rowRemains = true;
pJoinInfo->rowCtx.ts = timestamp;
pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
}
return TSDB_CODE_SUCCESS;
}
static void mLeftJoinDo(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
int64_t probeTs = INT64_MIN;
int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL;
bool asc = (pJoin->inputOrder == TSDB_ORDER_ASC) ? true : false;
do {
if (pCtx->rowRemains) {
probeTs = buildTs = pCtx->curTs;
} else {
if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx, probeTs)) {
break;
}
pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
probeTs = *((int64_t*)pProbeCol->pData + pJoin->probe->rowIdx);
buildTs = *((int64_t*)pBuildCol->pData + pCtx->buildIdx);
}
while (pCtx->probeIdx < pJoin->probe->blk->info.rows && pCtx->buildIdx < pJoin->build->blk->info.rows) {
if (probeTs == buildTs) {
mJoinEqualCart(pOperator, probeTs, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return;
}
break;
} else if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) {
pCtx->probeNEqGrps.beginIdx = pCtx->probeIdx;
do {
pCtx->probeNEqGrps.rowsNum++;
probeTs = *((int64_t*)pProbeCol->pData + (++pCtx->probeIdx));
} while (pCtx->probeIdx < pJoin->probe->blk->info.rows && LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs));
mJoinNonEqualCart(pOperator, &pCtx->probeNEqGrps, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return;
}
} else {
for (; pProbeIn->rowBeginIdx < pBInfo->pBlk->info.rows && pProbeIn->rowNum > 0; pProbeIn->rowBeginIdx++, pProbeIn->rowNum--) {
buildTs = *((int64_t*)pBuildCol->pData + (++pCtx->buildIdx));
while (pCtx->buildIdx < pJoin->build->blk->info.rows && LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) {
buildTs = *((int64_t*)pBuildCol->pData + (++pCtx->buildIdx));
}
}
}
} while (true);
}
static bool mLeftJoinCartOutput(SMJoinOperatorInfo* pJoin, SMJoinOutputCtx* pCtx) {
bool contLoop = false;
SMJoinCartCtx* pCart = &pCtx->cartCtx;
int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList);
int32_t rowsLeft = pCart->pResBlk->info.capacity - pCart->pResBlk->info.rows;
int32_t probeRows = 0;
int32_t buildRows = 0;
bool grpDone = false, brokenBlk = false;
for (; pCtx->grpReadIdx < grpNum && pCart->pResBlk->info.rows <= pCart->resThreshold; pCtx->grpReadIdx++) {
SGrpPairRes* pair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx);
if (!pair->finishGrp) {
ASSERTS(pCtx->grpReadIdx == grpNum - 1, "unfinished grp not the last");
taosArrayRemoveBatch(pCtx->pGrpResList, 0, pCtx->grpReadIdx - 1, NULL);
pCtx->grpReadIdx = 0;
break;
}
grpDone = false;
pCart->firstOnly = !pair->sameTsGrp;
do {
if (!pair->outBegin) {
probeRows = PRB_CUR_BLK_GRP_ROWS(pair->prbIn.rowNum, pair->prbIn.rowBeginIdx, pair->prbIn.pBeginBlk->pBlk->info.rows);
buildRows = BLD_CUR_BLK_GRP_ROWS(pair->sameTsGrp, pair->bldIn.rowNum, pair->bldIn.rowBeginIdx, pair->bldIn.pBeginBlk->pBlk->info.rows);
pCart->pFirstBlk = pair->prbIn.pBeginBlk;
pCart->firstRowIdx = pair->prbIn.rowBeginIdx;
pair->prbOut.pCurrBlk = pair->prbIn.pBeginBlk;
if (pair->sameTsGrp) {
pCart->pSecondBlk = pair->bldIn.pBeginBlk;
pCart->secondRowIdx = pair->bldIn.rowBeginIdx;
pair->bldOut.pCurrBlk = pair->bldIn.pBeginBlk;
}
pair->outBegin = true;
brokenBlk = false;
} else if (BUILD_TB_BROKEN_BLK(pair->sameTsGrp, &pair->bldOut, &pair->bldIn)) {
probeRows = PRB_CUR_BLK_GRP_ROWS(1, pair->prbOut.rowReadIdx, pair->prbOut.pCurrBlk->pBlk.info.rows);
buildRows = BLD_CUR_BLK_GRP_ROWS(pair->bldIn.rowNum - pair->bldOut.rowGReadNum, pair->bldOut.rowReadIdx, pair->bldOut.pCurrBlk->pBlk.info.rows);
pCart->firstRowIdx = pair->prbOut.rowReadIdx;
pCart->secondRowIdx = pair->bldOut.rowReadIdx;
brokenBlk = true;
} else {
probeRows = PRB_CUR_BLK_GRP_ROWS(pair->prbIn.rowNum - pair->prbOut.rowGReadNum, pair->prbOut.rowReadIdx, pair->prbOut.pCurrBlk->pBlk.info.rows);
buildRows = BLD_CUR_BLK_GRP_ROWS(pair->bldIn.rowNum - pair->bldOut.rowGReadNum, pair->bldOut.rowReadIdx, pair->bldOut.pCurrBlk->pBlk.info.rows);
pCart->firstRowIdx = pair->prbOut.rowReadIdx;
if (pair->sameTsGrp) {
pCart->secondRowIdx = pair->bldOut.rowReadIdx;
}
brokenBlk = false;
}
int64_t reqNum = probeRows * buildRows;
if (reqNum <= rowsLeft) {
pCart->firstRowNum = probeRows;
pCart->secondRowNum = buildRows;
pair->prbOut.rowGReadNum += probeRows;
if (!brokenBlk) {
pair->prbOut.rowReadIdx = 0;
if (pair->sameTsGrp) {
pair->bldOut.rowGReadNum += buildRows;
pair->bldOut.rowReadIdx = 0;
}
} else {
pair->prbOut.rowReadIdx += probeRows;
}
if (pair->prbOut.rowGReadNum >= pair->prbIn.rowNum) {
grpDone = true;
}
rowsLeft -= reqNum;
} else if (buildRows <= rowsLeft) {
pCart->firstRowNum = brokenBlk ? 1 : (rowsLeft / buildRows);
pair->prbOut.rowGReadNum += pCart->firstRowNum;
pair->prbOut.rowReadIdx = pCart->firstRowIdx + pCart->firstRowNum;
pCart->secondRowNum = buildRows;
pair->bldOut.rowReadIdx = 0;
rowsLeft -= (pCart->firstRowNum * pCart->secondRowNum);
} else {
ASSERT(pair->sameTsGrp);
pCart->firstRowNum = 1;
pCart->secondRowNum = rowsLeft;
pair->bldOut.rowReadIdx = pCart->secondRowIdx + rowsLeft;
rowsLeft = 0;
}
mLeftJoinCart(pCart);
}while ((!grpDone) && pCart->pResBlk->info.rows <= pCart->resThreshold);
if (!grpDone) {
break;
}
}
}
static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
bool contLoop = false;
do {
switch (pJoin->ctx.mergeCtx.joinPhase) {
case E_JOIN_PHASE_RETRIEVE:
contLoop = mLeftJoinRetrieve(pOperator, pJoin);
break;
case E_JOIN_PHASE_SPLIT:
contLoop = mLeftJoinSplitGrp(pOperator, pJoin);
break;
case E_JOIN_PHASE_OUTPUT:
contLoop = mLeftJoinOutput(pOperator, pJoin);
break;
case E_JOIN_PHASE_DONE:
contLoop = false;
break;
}
} while (contLoop);
}
SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;