feat: support left join

This commit is contained in:
dapan1121 2023-12-06 19:22:14 +08:00
parent ee03cbc404
commit 7c0e8b559c
3 changed files with 396 additions and 148 deletions

View File

@ -479,6 +479,42 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
return 0;
}
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows) {
if (pDst->info.type != pSrc->info.type || pSrc->reassigned) {
return TSDB_CODE_FAILED;
}
if (numOfRows <= 0) {
return numOfRows;
}
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
memcpy(pDst->varmeta.offset, pSrc->varmeta.offset, sizeof(int32_t) * numOfRows);
if (pDst->varmeta.allocLen < pSrc->varmeta.length) {
char* tmp = taosMemoryRealloc(pDst->pData, pSrc->varmeta.length);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pDst->pData = tmp;
pDst->varmeta.allocLen = pSrc->varmeta.length;
}
pDst->varmeta.length = pSrc->varmeta.length;
if (pDst->pData != NULL && pSrc->pData != NULL) {
memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length);
}
} else {
memcpy(pDst->nullbitmap, pSrc->nullbitmap, BitmapLen(numOfRows));
if (pSrc->pData != NULL) {
memcpy(pDst->pData, pSrc->pData, pSrc->info.bytes * numOfRows);
}
}
return 0;
}
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }

View File

@ -30,20 +30,29 @@ typedef enum EJoinPhase {
E_JOIN_PHASE_RETRIEVE,
E_JOIN_PHASE_SPLIT,
E_JOIN_PHASE_OUTPUT,
E_JOIN_PHASE_
E_JOIN_PHASE_DONE
} EJoinPhase;
typedef struct SMJoinColInfo {
int32_t srcSlot;
int32_t dstSlot;
bool keyCol;
bool vardata;
int32_t* offset;
int32_t bytes;
char* data;
char* bitMap;
} SMJoinColInfo;
typedef struct SMJoinCartCtx {
SSDataBlock* pResBlk;
int32_t firstColNum;
SMJoinColInfo* pFirstCols;
SSDataBlock* pFirstBlk;
int32_t firstRowIdx;
int32_t firstRowNum;
int32_t secondColNum;
SMJoinColInfo* pSecondCols;
SSDataBlock* pSecondBlk;
int32_t secondRowIdx;
int32_t secondRowNum;
} SMJoinCartCtx;
typedef struct SMJoinBlkInfo {
bool cloned;
@ -66,6 +75,10 @@ typedef struct SMJoinTableInfo {
SMJoinColInfo* primCol;
char* primData;
int32_t finNum;
SMJoinColInfo* finCols;
int32_t keyNum;
SMJoinColInfo* keyCols;
char* keyBuf;
@ -91,41 +104,61 @@ typedef struct SMJoinTsJoinCtx {
int64_t* buildEndTs;
bool inSameTsGrp;
bool inDiffTsGrp;
SGrpPairCtx* pLastGrpPairCtx;
SGrpPairCtx currGrpPairCtx;
bool nextProbeRow;
SGrpPairRes* pLastGrpPair;
SGrpPairRes currGrpPair;
} SMJoinTsJoinCtx;
typedef struct SBuildGrpCtx {
typedef struct SBuildGrpResIn {
bool multiBlkGrp;
SMJoinBlkInfo* grpBeginBlk;
int32_t grpRowBeginIdx;
int32_t grpRowNum;
} SBuildGrpResIn;
typedef struct SBuildGrpResOut {
bool hashJoin;
SSHashObj* pGrpHash;
int32_t grpRowReadIdx;
int32_t grpRowGReadIdx;
} SBuildGrpResOut;
typedef struct SProbeGrpResIn {
bool allRowsGrp;
SMJoinBlkInfo* grpBeginBlk;
int32_t grpRowBeginIdx;
int32_t grpRowNum;
} SBuildGrpCtx;
int64_t grpLastTs;
} SProbeGrpResIn;
typedef struct SProbeGrpCtx {
typedef struct SProbeGrpResOut {
int32_t grpRowReadIdx;
int32_t grpRowBeginIdx;
int32_t grpRowNum;
} SProbeGrpCtx;
} SProbeGrpResOut;
typedef struct SGrpPairCtx {
typedef struct SGrpPairRes {
bool sameTsGrp;
bool finishGrp;
SBuildGrpCtx buildGrp;
SProbeGrpCtx probeGrp;
} SGrpPairCtx;
SProbeGrpResIn probeIn;
SBuildGrpResIn buildIn;
/* KEEP THIS PART AT THE END */
bool outBegin;
SBuildGrpResOut buildOut;
SProbeGrpResOut probeOut;
/* KEEP THIS PART AT THE END */
} SGrpPairRes;
#define GRP_PAIR_INIT_SIZE (sizeof(SGrpPairRes) - sizeof(bool) - sizeof(SBuildGrpResOut) - sizeof(SProbeGrpResOut))
typedef struct SMJoinOutputCtx {
int32_t grpReadIdx;
int32_t grpWriteIdx;
SArray* pGrpList;
SMJoinCartCtx cartCtx;
SArray* pGrpResList;
} SMJoinOutputCtx;
typedef struct SMJoinTableCtx {
EJoinTableType type;
void* blkFetchedFp;
SMJoinTableInfo* pTbInfo;
bool dsInitDone;
bool dsFetchDone;
@ -197,39 +230,33 @@ typedef struct SMJoinOperatorInfo {
SMJoinExecInfo execInfo;
} SMJoinOperatorInfo;
#define MJOIN_DOWNSTREAM_NEED_INIT(_pOp) ((_pOp)->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)(_pOp)->pOperatorGetParam->value)->initDownstream)
#define MJOIN_DS_REQ_INIT(_pOp) ((_pOp)->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)(_pOp)->pOperatorGetParam->value)->initDownstream)
#define MJOIN_DS_NEED_INIT(_pOp, _tbctx) (MJOIN_DS_REQ_INIT(_pOp) && (!(_tbctx)->dsInitDone))
#define MJOIN_TB_LOW_BLK(_tbctx) ((_tbctx)->blkNum <= 0 || ((_tbctx)->blkNum == 1 && (_tbctx)->pHeadBlk->cloned))
#define FIN_SAME_TS_GRP() do { \
if (inSameTsGrp) { \
grpPairCtx.sameTsGrp = true; \
grpPairCtx.finishGrp = true; \
grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \
grpPairCtx.probeGrp.grpRowNum = 1; \
inSameTsGrp = false; \
pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \
#define START_NEW_GRP(_ctx) memset(&(_ctx)->currGrpPair, 0, GRP_PAIR_INIT_SIZE)
#define FIN_SAME_TS_GRP(_ctx, _octx, _done) do { \
if ((_ctx)->inSameTsGrp) { \
(_ctx)->currGrpPair.sameTsGrp = true; \
(_ctx)->currGrpPair.finishGrp = (_done); \
(_ctx)->inSameTsGrp = false; \
(_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \
} \
} while (0)
#define PAUSE_SAME_TS_GRP() do { \
if (inSameTsGrp) { \
grpPairCtx.sameTsGrp = true; \
grpPairCtx.finishGrp = false; \
grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \
grpPairCtx.probeGrp.grpRowNum = 1; \
inSameTsGrp = false; \
pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \
} \
} while (0)
#define FIN_DIFF_TS_GRP() do { \
if (inDiffTsGrp) { \
grpPairCtx.sameTsGrp = false; \
grpPairCtx.finishGrp = true; \
grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \
grpPairCtx.probeGrp.grpRowNum = 1; \
inDiffTsGrp = false; \
pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \
#define FIN_DIFF_TS_GRP(_ctx, _octx, _done) do { \
if ((_ctx)->inDiffTsGrp) { \
(_ctx)->currGrpPair.sameTsGrp = false; \
(_ctx)->currGrpPair.finishGrp = true; \
(_ctx)->currGrpPair.probeIn.allRowsGrp= (_done); \
(_ctx)->inDiffTsGrp = false; \
(_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \
} else if (_done) { \
(_ctx)->currGrpPair.sameTsGrp = false; \
(_ctx)->currGrpPair.finishGrp = true; \
(_ctx)->currGrpPair.probeIn.grpRowBeginIdx = (_ctx)->pProbeCtx->blkRowIdx; \
(_ctx)->currGrpPair.probeIn.allRowsGrp = true; \
} \
} while (0)

View File

@ -266,93 +266,147 @@ static int32_t mJoinInitBufPages(SMJoinOperatorInfo* pInfo) {
return mJoinAddPageToBufList(pInfo->pRowBufs);
}
static int32_t mJoinDoRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTbCtx, SMJoinTableInfo* pTbInfo) {
static int32_t mJoinDoRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTbCtx) {
bool retrieveCont = false;
int32_t code = TSDB_CODE_SUCCESS;
do {
SSDataBlock* pBlock = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTbInfo->downStreamIdx);
SSDataBlock* pBlock = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTbCtx->pTbInfo->downStreamIdx);
pTbCtx->dsInitDone = true;
if (NULL == pBlock) {
retrieveCont = false;
code = (*pJoin->joinFps.handleTbFetchDoneFp)(pJoin, pTbCtx, pTbInfo);
code = (*pJoin->joinFps.handleTbFetchDoneFp)(pJoin, pTbCtx);
} else {
code = (*pJoin->joinFps.handleBlkFetchedFp)(pJoin, pTbCtx, pTbInfo, pBlock, &retrieveCont);
code = (*pTbCtx->blkFetchedFp)(pJoin, pTbCtx, pBlock, &retrieveCont);
}
} while (retrieveCont || TSDB_CODE_SUCCESS != code);
return code;
}
static FORCE_INLINE bool mJoinBuildTbNeedRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx) {
if ((!pBuildCtx->grpRetrieved) && (!pBuildCtx->dsFetchDone)) {
return true;
}
return false;
}
static FORCE_INLINE bool mJoinProbeTbNeedRetrieve(SOperatorInfo* pOperator, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) {
SMJoinOperatorInfo* pJoin = pOperator->info;
if (MJOIN_DOWNSTREAM_NEED_INIT(pOperator) && !pBuildCtx->dsInitDone) {
return true;
}
if (((!pProbeCtx->grpRetrieved) && (!pProbeCtx->dsFetchDone)) && (pBuildCtx->grpRetrieved || pJoin->ctx.flags.retrieveAfterBuildDone))) {
return true;
}
return false;
}
static bool mJoinOpenMergeJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx;
int32_t code = mJoinDoRetrieve(pOperator, pBuildCtx, pJoin->pBuild);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
if (pBuildCtx->dsFetchDone && !pJoin->ctx.flags.retrieveAfterBuildDone) {
return false;
}
code = mJoinDoRetrieve(pOperator, pProbeCtx, pJoin->pProbe);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
return true;
}
static void mJoinInitJoinCtx(SMJoinOperatorInfo* pJoin) {
blkFetchedFp;
}
static void mJoinLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
static void mJoinSetDone(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
if (pOperator->pDownstreamGetParams) {
freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM);
freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM);
pOperator->pDownstreamGetParams[0] = NULL;
pOperator->pDownstreamGetParams[1] = NULL;
}
}
static int32_t mJoinAddBlkToList(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pCtx, SSDataBlock* pBlock) {
SMJoinBlkInfo* pNew = taosMemoryCalloc(1, sizeof(SMJoinBlkInfo));
if (NULL == pNew) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pNew->pBlk = pBlock;
if (NULL == pCtx->pTailBlk) {
pCtx->pTailBlk = pCtx->pHeadBlk = pNew;
pCtx->pCurrBlk = pCtx->pHeadBlk;
pCtx->blkIdx = 0;
pCtx->blkRowIdx = 0;
pCtx->blkNum = 1;
if (E_JOIN_TB_PROBE == pCtx->type) {
SColumnInfoData* probeCol = taosArrayGet(pCtx->pCurrBlk->pBlk, pCtx->pTbInfo->primCol->srcSlot);
pCtx->blkCurTs = *(int64_t*)probeCol->pData;
}
} else {
pCtx->pTailBlk->pNext = pNew;
pCtx->blkNum++;
if (E_JOIN_TB_PROBE == pCtx->type) {
SMJoinTsJoinCtx* pTsCtx = &pJoin->ctx.mergeCtx.tsJoinCtx;
pCtx->blkCurTs = pTsCtx->probeTs[pCtx->blkRowIdx];
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinProbeFetchDone(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pProbeCtx) {
pProbeCtx->dsFetchDone = true;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinBuildFetchDone(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx) {
pBuildCtx->dsFetchDone = true;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinProbeBlkFetched(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pProbeCtx, SSDataBlock* pBlock, bool* retrieveCont) {
int32_t code = mJoinAddBlkToList(pJoin, pProbeCtx, pBlock);
if (code) {
return code;
}
*retrieveCont = false;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinBuildBlkFetched(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx, SSDataBlock* pBlock, bool* retrieveCont) {
SMJoinTableCtx* pProbeCtx = &pJoin->ctx.mergeCtx.probeTbCtx;
if (pProbeCtx->blkNum <= 0) {
*retrieveCont = false;
return TSDB_CODE_SUCCESS;
}
SColumnInfoData* tsCol = taosArrayGet(pBlock, pBuildCtx->pTbInfo->primCol->srcSlot);
int64_t lastTs = *((int64_t*)tsCol->pData + pBlock->info.rows - 1);
if (pProbeCtx->blkCurTs > lastTs) {
*retrieveCont = true;
} else {
int32_t code = mJoinAddBlkToList(pJoin, pBuildCtx, pBlock);
if (code) {
return code;
}
if (pProbeCtx->blkCurTs == lastTs && lastTs == *(int64_t*)tsCol->pData) {
*retrieveCont = true;
} else {
*retrieveCont = false;
}
}
return TSDB_CODE_SUCCESS;
}
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx;
if ((!pProbeCtx->dsFetchDone) && MJOIN_TB_LOW_BLK(pProbeCtx)) {
int32_t code = mJoinDoRetrieve(pOperator, pProbeCtx, pJoin->pProbe);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
if (NULL == pProbeCtx->pHeadBlk) {
return;
}
code = mJoinDoRetrieve(pJoin, pBuildCtx, pJoin->pBuild);
if ((pProbeCtx->blkNum > 0 || MJOIN_DS_NEED_INIT(pOperator, pBuildCtx)) && (!pBuildCtx->dsFetchDone) && MJOIN_TB_LOW_BLK(pBuildCtx)) {
int32_t code = mJoinDoRetrieve(pJoin, pBuildCtx, pJoin->pBuild);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_TS_JOIN;
}
static FORCE_INLINE void mJoinLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) {
if (pProbeCtx->pHeadBlk) {
pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_SPLIT;
return true;
}
mJoinSetDone(pOperator);
return false;
}
static FORCE_INLINE void mLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) {
pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows;
pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows;
SColumnInfoData* probeCol = taosArrayGet(pProbeCtx->pCurrBlk->pBlk, pProbeCtx->pTbInfo->primCol->srcSlot);
@ -363,13 +417,14 @@ static FORCE_INLINE void mJoinLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJ
pCtx->buildEndTs = (int64_t*)buildCol->pData + pCtx->buildRowNum - 1;
}
static bool mJoinMoveToNextProbeTable(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx) {
static bool mJoinProbeMoveToNextBlk(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx) {
if (NULL == pProbeCtx->pCurrBlk->pNext) {
pProbeCtx->blkIdx++;
return false;
}
pProbeCtx->pCurrBlk = pProbeCtx->pCurrBlk->pNext;
pProbeCtx->blkIdx++;
pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows;
SColumnInfoData* probeCol = taosArrayGet(pProbeCtx->pCurrBlk->pBlk, pProbeCtx->pTbInfo->primCol->srcSlot);
pCtx->probeTs = (int64_t*)probeCol->pData;
@ -379,17 +434,38 @@ static bool mJoinMoveToNextProbeTable(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pPr
return true;
}
static bool mJoinMoveToNextBuildTable(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pBuildCtx, SMJoinTableCtx* pProbeCtx) {
static void mJoinLeftJoinAddBlkToGrp(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) {
FIN_SAME_TS_GRP();
int32_t rowNum = pProbeCtx->pCurrBlk->pBlk->info.rows - pProbeCtx->blkRowIdx;
if (pCtx->nextProbeRow && pCtx->inDiffTsGrp) {
pCtx->currGrpPair->probeIn.grpRowNum += rowNum;
} else {
pCtx->inDiffTsGrp = true;
START_NEW_GRP(pCtx);
pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair->probeIn.grpRowNum = rowNum;
}
pCtx->nextProbeRow = true;
}
static bool mJoinBuildMoveToNextBlk(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pBuildCtx, SMJoinTableCtx* pProbeCtx) {
bool contLoop = false;
bool res = false;
bool res = true;
pCtx->nextProbeRow = false;
do {
if (pBuildCtx->pCurrBlk->pNext) {
if (NULL == pBuildCtx->pCurrBlk->pNext) {
pBuildCtx->blkIdx++;
return false;
}
pBuildCtx->pCurrBlk = pBuildCtx->pCurrBlk->pNext;
pBuildCtx->blkIdx++;
pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows;
SColumnInfoData* buildCol = taosArrayGet(pBuildCtx->pCurrBlk->pBlk, pBuildCtx->pTbInfo->primCol->srcSlot);
pCtx->buildTs = (int64_t*)buildCol->pData;
@ -399,8 +475,9 @@ static bool mJoinMoveToNextBuildTable(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx
do {
if (*pCtx->buildTs > pCtx->probeTs[pProbeCtx->blkRowIdx]) {
mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx);
contLoop = mJoinMoveToNextProbeTable(pCtx, pProbeCtx);
contLoop = mJoinProbeMoveToNextBlk(pCtx, pProbeCtx);
} else if (pCtx->probeTs[pProbeCtx->blkRowIdx] > *pCtx->buildEndTs) {
contLoop = true;
break;
} else {
contLoop = false;
@ -412,75 +489,90 @@ static bool mJoinMoveToNextBuildTable(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx
return res;
}
static void mJoinLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
static bool mLeftJoinSplitGrpImpl(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinTsJoinCtx* pCtx = &pJoin->ctx.mergeCtx.tsJoinCtx;
SMJoinOutputCtx* pOutCtx = &pJoin->ctx.mergeCtx.outputCtx;
SMJoinTableCtx* pProbeCtx = &pJoin->ctx.mergeCtx.probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pJoin->ctx.mergeCtx.buildTbCtx;
mJoinLefeJoinUpdateTsJoinCtx(pCtx, pProbeCtx, pBuildCtx);
mLefeJoinUpdateTsJoinCtx(pCtx, pProbeCtx, pBuildCtx);
bool nextRow = false;
for (; pProbeCtx->blkIdx < pProbeCtx->blkNum; mJoinMoveToNextProbeTable(pCtx, pProbeCtx)) {
for (; pProbeCtx->blkIdx < pProbeCtx->blkNum; mJoinProbeMoveToNextBlk(pCtx, pProbeCtx)) {
if (*pCtx->buildTs > *pCtx->probeEndTs) {
mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx);
continue;
} else if (*pCtx->probeTs > *pCtx->buildEndTs) {
if (!mJoinMoveToNextBuildTable(pJoin, pCtx, pBuildCtx, pProbeCtx)) {
if (!mJoinBuildMoveToNextBlk(pJoin, pCtx, pBuildCtx, pProbeCtx)) {
break;
//retrieve build
}
}
for (; pProbeCtx->blkRowIdx < pCtx->probeRowNum; ++pProbeCtx->blkRowIdx) {
for (; pBuildCtx->blkIdx < pBuildCtx->blkNum; mJoinMoveToNextBuildTable(pJoin, pCtx, pBuildCtx, pProbeCtx)) {
if (pCtx->pLastGrpPair && pCtx->pLastGrpPair->sameTsGrp
&& pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->pLastGrpPair->probeIn.grpLastTs) {
pCtx->pLastGrpPair->probeIn.grpRowNum++;
continue;
}
for (; pBuildCtx->blkIdx < pBuildCtx->blkNum; mJoinBuildMoveToNextBlk(pJoin, pCtx, pBuildCtx, pProbeCtx)) {
for (; pBuildCtx->blkRowIdx < pCtx->buildRowNum; ++pBuildCtx->blkRowIdx) {
if (pCtx->probeTs[pProbeCtx->blkRowIdx] > pCtx->buildTs[pBuildCtx->blkRowIdx]) {
FIN_SAME_TS_GRP();
FIN_DIFF_TS_GRP();
FIN_SAME_TS_GRP(pCtx, pOutCtx, true);
FIN_DIFF_TS_GRP(pCtx, pOutCtx, false);
pCtx->nextProbeRow = false;
continue;
} else if (pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->buildTs[pBuildCtx->blkRowIdx]) {
FIN_DIFF_TS_GRP();
FIN_DIFF_TS_GRP(pCtx, pOutCtx, false);
if (pCtx->inSameTsGrp) {
pCtx->currGrpPairCtx.buildGrp.grpRowNum++;
pCtx->currGrpPair.buildIn.grpRowNum++;
} else {
pCtx->inSameTsGrp = true;
pCtx->currGrpPairCtx.buildGrp.grpRowBeginIdx = pBuildCtx->blkRowIdx;
START_NEW_GRP(pCtx);
pCtx->currGrpPair.buildIn.grpBeginBlk = pBuildCtx->pCurrBlk;
pCtx->currGrpPair.buildIn.grpRowBeginIdx = pBuildCtx->blkRowIdx;
pCtx->currGrpPair.buildIn.grpRowNum = 1;
pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.probeIn.grpLastTs = pCtx->probeTs[pProbeCtx->blkRowIdx];
pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair.probeIn.grpRowNum = 1;
}
pCtx->nextProbeRow = false;
} else {
FIN_SAME_TS_GRP();
FIN_SAME_TS_GRP(pCtx, pOutCtx, true);
if (pCtx->inDiffTsGrp) {
pCtx->currGrpPairCtx.probeGrp.grpRowNum++;
pCtx->currGrpPair.probeIn.grpRowNum++;
} else {
pCtx->inDiffTsGrp = true;
pCtx->currGrpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx;
START_NEW_GRP(pCtx);
pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair.probeIn.grpRowNum = 1;
}
nextRow = true;
pCtx->nextProbeRow = true;
break;
}
}
// end of single build table
if (nextRow) {
if (pCtx->nextProbeRow) {
break;
}
}
// end of all build tables
if (nextRow) {
if (pCtx->nextProbeRow) {
continue;
}
if (pCtx->inSameTsGrp) {
PAUSE_SAME_TS_GRP();
FIN_SAME_TS_GRP(pCtx, pOutCtx, pBuildCtx->dsFetchDone);
}
break;
}
// end of single probe table
if (nextRow) {
if (pCtx->nextProbeRow) {
continue;
}
@ -488,15 +580,105 @@ static void mJoinLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo*
}
// end of all probe tables
FIN_DIFF_TS_GRP();
FIN_DIFF_TS_GRP(pCtx, pOutCtx, pBuildCtx->dsFetchDone);
return true;
}
static FORCE_INLINE void mJoinResetTsJoinCtx(SMJoinTsJoinCtx* pCtx) {
pCtx->inSameTsGrp = false;
pCtx->inDiffTsGrp = false;
pCtx->nextProbeRow = false;
pCtx->pLastGrpPair = NULL;
}
static bool mLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinTsJoinCtx* pCtx = &pJoin->ctx.mergeCtx.tsJoinCtx;
SMJoinTableCtx* pBuildCtx = &pJoin->ctx.mergeCtx.buildTbCtx;
mJoinResetTsJoinCtx(pCtx);
if (0 == pJoin->ctx.mergeCtx.buildTbCtx.blkNum) {
ASSERTS(pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone, "left join empty build table while fetch not done");
FIN_DIFF_TS_GRP(pCtx, &pJoin->ctx.mergeCtx.outputCtx, pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone);
} else {
mLeftJoinSplitGrpImpl(pOperator, pJoin);
}
pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_OUTPUT;
return true;
}
static bool mJoinLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
static void mLeftJoinCart(SMJoinCartCtx* pCtx) {
int32_t currRows = pCtx->pResBlk->info.rows;
for (int32_t c = 0; c < pCtx->firstColNum; ++c) {
SMJoinColInfo* pFirstCol = pCtx->pFirstCols + c;
SColumnInfoData* pInCol = taosArrayGet(pCtx->pFirstBlk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pFirstCol->dstSlot);
for (int32_t r = 0; r < pCtx->firstRowNum; ++r) {
if (colDataIsNull_s(pInCol, pCtx->firstRowIdx + r)) {
colDataSetNItemsNull(pOutCol, currRows + r * pCtx->secondRowNum, pCtx->secondRowNum);
} else {
colDataSetNItems(pOutCol, currRows + r * pCtx->secondRowNum, colDataGetData(pInCol, pCtx->firstRowIdx + r), pCtx->secondRowNum, true);
}
}
}
}
static bool mLeftJoinSameTsOutput(SMJoinOperatorInfo* pJoin, SGrpPairRes* pPair) {
SProbeGrpResIn* pProbeIn = &pPair->probeIn;
SBuildGrpResIn* pBuildIn = &pPair->buildIn;
if (!pPair->outBegin) {
}
SMJoinBlkInfo* pBInfo = pProbeIn->grpBeginBlk;
do {
if (pJoin->prevFilter) {
} else {
if (pPair->buildOut.hashJoin) {
} else {
for (; pProbeIn->grpRowBeginIdx < pBInfo->pBlk->info.rows && pProbeIn->grpRowNum > 0; pProbeIn->grpRowBeginIdx++, pProbeIn->grpRowNum--) {
}
}
}
} while (true);
}
static bool mLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinOutputCtx* pCtx = &pJoin->ctx.mergeCtx.outputCtx;
bool contLoop = false;
int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList);
for (; pCtx->grpReadIdx < grpNum; pCtx->grpReadIdx++) {
SGrpPairRes* pPair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx);
if (!pPair->finishGrp) {
ASSERTS(pCtx->grpReadIdx == grpNum - 1, "unfinished grp not the last");
taosArrayRemoveBatch(pCtx->pGrpResList, 0, pCtx->grpReadIdx - 1, NULL);
pCtx->grpReadIdx = 0;
break;
}
if (pPair->sameTsGrp) {
contLoop = mLeftJoinSameTsOutput(pJoin, pPair);
} else {
contLoop = mLeftJoinDirectOutput(pJoin, pPair);
}
if (!contLoop) {
return false;
}
}
pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_RETRIEVE;
return true;
}
static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
@ -505,13 +687,16 @@ static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo
do {
switch (pJoin->ctx.mergeCtx.joinPhase) {
case E_JOIN_PHASE_RETRIEVE:
contLoop = mJoinLeftJoinRetrieve(pOperator, pJoin);
contLoop = mLeftJoinRetrieve(pOperator, pJoin);
break;
case E_JOIN_PHASE_SPLIT:
contLoop = mJoinLeftJoinSplitGrp(pOperator, pJoin);
contLoop = mLeftJoinSplitGrp(pOperator, pJoin);
break;
case E_JOIN_PHASE_OUTPUT:
contLoop = mJoinLeftJoinOutput(pOperator, pJoin);
contLoop = mLeftJoinOutput(pOperator, pJoin);
break;
case E_JOIN_PHASE_DONE:
contLoop = false;
break;
}
} while (contLoop);