feat: support left join

This commit is contained in:
dapan1121 2023-12-05 19:27:57 +08:00
parent 4700ebafd1
commit 519622dbf4
2 changed files with 436 additions and 53 deletions

View File

@ -21,6 +21,18 @@ extern "C" {
typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*);
typedef enum EJoinTableType {
E_JOIN_TB_BUILD = 1,
E_JOIN_TB_PROBE
} EJoinTableType;
typedef enum EJoinPhase {
E_JOIN_PHASE_RETRIEVE,
E_JOIN_PHASE_SPLIT,
E_JOIN_PHASE_OUTPUT,
E_JOIN_PHASE_
} EJoinPhase;
typedef struct SMJoinColInfo {
int32_t srcSlot;
int32_t dstSlot;
@ -33,6 +45,18 @@ typedef struct SMJoinColInfo {
} SMJoinColInfo;
typedef struct SMJoinBlkInfo {
bool cloned;
SSDataBlock* pBlk;
void* pNext;
} SMJoinBlkInfo;
typedef struct SMJoinRowInfo {
int64_t blkId;
int32_t rowIdx;
int64_t rowGIdx;
} SMJoinRowInfo;
typedef struct SMJoinTableInfo {
int32_t downStreamIdx;
SOperatorInfo* downStream;
@ -56,8 +80,109 @@ typedef struct SMJoinTableInfo {
bool valColExist;
} SMJoinTableInfo;
typedef struct SMJoinTsJoinCtx {
SMJoinTableCtx* pProbeCtx;
SMJoinTableCtx* pBuildCtx;
int64_t probeRowNum;
int64_t buildRowNum;
int64_t* probeTs;
int64_t* buildTs;
int64_t* probeEndTs;
int64_t* buildEndTs;
bool inSameTsGrp;
bool inDiffTsGrp;
SGrpPairCtx* pLastGrpPairCtx;
SGrpPairCtx currGrpPairCtx;
} SMJoinTsJoinCtx;
typedef struct SBuildGrpCtx {
bool multiBlkGrp;
bool hashJoin;
SSHashObj* pGrpHash;
int32_t grpRowReadIdx;
int32_t grpRowGReadIdx;
int32_t grpRowBeginIdx;
int32_t grpRowNum;
} SBuildGrpCtx;
typedef struct SProbeGrpCtx {
int32_t grpRowReadIdx;
int32_t grpRowBeginIdx;
int32_t grpRowNum;
} SProbeGrpCtx;
typedef struct SGrpPairCtx {
bool sameTsGrp;
bool finishGrp;
SBuildGrpCtx buildGrp;
SProbeGrpCtx probeGrp;
} SGrpPairCtx;
typedef struct SMJoinOutputCtx {
int32_t grpReadIdx;
int32_t grpWriteIdx;
SArray* pGrpList;
} SMJoinOutputCtx;
typedef struct SMJoinTableCtx {
EJoinTableType type;
SMJoinTableInfo* pTbInfo;
bool dsInitDone;
bool dsFetchDone;
int64_t blkCurTs;
int32_t blkRowIdx;
int64_t blkIdx;
int64_t blkNum;
SMJoinBlkInfo* pCurrBlk;
SMJoinBlkInfo* pHeadBlk;
SMJoinBlkInfo* pTailBlk;
} SMJoinTableCtx;
typedef struct SMJoinMergeCtx {
bool hashJoin;
EJoinPhase joinPhase;
int64_t grpCurTs;
SMJoinOutputCtx outputCtx;
SMJoinTsJoinCtx tsJoinCtx;
SMJoinTableCtx buildTbCtx;
SMJoinTableCtx probeTbCtx;
} SMJoinMergeCtx;
typedef struct SMJoinWinCtx {
} SMJoinWinCtx;
typedef struct SMJoinFlowFlags {
bool mergeJoin;
bool windowJoin;
bool preFilter;
bool retrieveAfterBuildDone;
} SMJoinFlowFlags;
typedef struct SMJoinCtx {
SMJoinFlowFlags* pFlags;
union {
SMJoinMergeCtx mergeCtx;
SMJoinWinCtx winCtx;
};
} SMJoinCtx;
typedef struct SMJoinExecInfo {
int64_t buildBlkNum;
int64_t buildBlkRows;
int64_t probeBlkNum;
int64_t probeBlkRows;
int64_t resRows;
int64_t expectRows;
} SMJoinExecInfo;
typedef struct SMJoinOperatorInfo {
SOperatorInfo* pOperator;
int32_t joinType;
int32_t subType;
int32_t inputTsOrder;
SMJoinTableInfo tbs[2];
SMJoinTableInfo* pBuild;
@ -65,16 +190,50 @@ typedef struct SMJoinOperatorInfo {
SSDataBlock* pRes;
int32_t pResColNum;
int8_t* pResColMap;
SArray* pRowBufs;
SFilterInfo* pPreFilter;
SFilterInfo* pFinFilter;
SSHashObj* pKeyHash;
bool keyHashBuilt;
joinImplFp joinFp;
SHJoinCtx ctx;
SHJoinExecInfo execInfo;
SMJoinFuncs joinFps;
SMJoinCtx ctx;
SMJoinExecInfo execInfo;
} SMJoinOperatorInfo;
#define MJOIN_DOWNSTREAM_NEED_INIT(_pOp) ((_pOp)->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)(_pOp)->pOperatorGetParam->value)->initDownstream)
#define FIN_SAME_TS_GRP() do { \
if (inSameTsGrp) { \
grpPairCtx.sameTsGrp = true; \
grpPairCtx.finishGrp = true; \
grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \
grpPairCtx.probeGrp.grpRowNum = 1; \
inSameTsGrp = false; \
pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \
} \
} while (0)
#define PAUSE_SAME_TS_GRP() do { \
if (inSameTsGrp) { \
grpPairCtx.sameTsGrp = true; \
grpPairCtx.finishGrp = false; \
grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \
grpPairCtx.probeGrp.grpRowNum = 1; \
inSameTsGrp = false; \
pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \
} \
} while (0)
#define FIN_DIFF_TS_GRP() do { \
if (inDiffTsGrp) { \
grpPairCtx.sameTsGrp = false; \
grpPairCtx.finishGrp = true; \
grpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx; \
grpPairCtx.probeGrp.grpRowNum = 1; \
inDiffTsGrp = false; \
pLastGrpPairCtx = taosArrayPush(pCtx->grpCtx.pGrpList, &grpPairCtx); \
} \
} while (0)
#ifdef __cplusplus
}
#endif

View File

@ -266,57 +266,280 @@ static int32_t mJoinInitBufPages(SMJoinOperatorInfo* pInfo) {
return mJoinAddPageToBufList(pInfo->pRowBufs);
}
static SSDataBlock* mJoinHanleMergeJoin(SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
int32_t nrows = pRes->info.rows;
bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false;
while (1) {
int64_t leftTs = 0;
int64_t rightTs = 0;
if (pJoinInfo->rowCtx.rowRemains) {
leftTs = pJoinInfo->rowCtx.ts;
rightTs = pJoinInfo->rowCtx.ts;
static int32_t mJoinDoRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTbCtx, SMJoinTableInfo* pTbInfo) {
bool retrieveCont = false;
int32_t code = TSDB_CODE_SUCCESS;
do {
SSDataBlock* pBlock = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTbInfo->downStreamIdx);
pTbCtx->dsInitDone = true;
if (NULL == pBlock) {
retrieveCont = false;
code = (*pJoin->joinFps.handleTbFetchDoneFp)(pJoin, pTbCtx, pTbInfo);
} else {
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
if (!hasNextTs) {
code = (*pJoin->joinFps.handleBlkFetchedFp)(pJoin, pTbCtx, pTbInfo, pBlock, &retrieveCont);
}
} while (retrieveCont || TSDB_CODE_SUCCESS != code);
return code;
}
static FORCE_INLINE bool mJoinBuildTbNeedRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pBuildCtx) {
if ((!pBuildCtx->grpRetrieved) && (!pBuildCtx->dsFetchDone)) {
return true;
}
return false;
}
static FORCE_INLINE bool mJoinProbeTbNeedRetrieve(SOperatorInfo* pOperator, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) {
SMJoinOperatorInfo* pJoin = pOperator->info;
if (MJOIN_DOWNSTREAM_NEED_INIT(pOperator) && !pBuildCtx->dsInitDone) {
return true;
}
if (((!pProbeCtx->grpRetrieved) && (!pProbeCtx->dsFetchDone)) && (pBuildCtx->grpRetrieved || pJoin->ctx.flags.retrieveAfterBuildDone))) {
return true;
}
return false;
}
static bool mJoinOpenMergeJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx;
int32_t code = mJoinDoRetrieve(pOperator, pBuildCtx, pJoin->pBuild);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
if (pBuildCtx->dsFetchDone && !pJoin->ctx.flags.retrieveAfterBuildDone) {
return false;
}
code = mJoinDoRetrieve(pOperator, pProbeCtx, pJoin->pProbe);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
return true;
}
static void mJoinInitJoinCtx(SMJoinOperatorInfo* pJoin) {
}
static void mJoinLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx;
int32_t code = mJoinDoRetrieve(pOperator, pProbeCtx, pJoin->pProbe);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
if (NULL == pProbeCtx->pHeadBlk) {
return;
}
code = mJoinDoRetrieve(pJoin, pBuildCtx, pJoin->pBuild);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_TS_JOIN;
}
static FORCE_INLINE void mJoinLefeJoinUpdateTsJoinCtx(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx, SMJoinTableCtx* pBuildCtx) {
pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows;
pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows;
SColumnInfoData* probeCol = taosArrayGet(pProbeCtx->pCurrBlk->pBlk, pProbeCtx->pTbInfo->primCol->srcSlot);
SColumnInfoData* buildCol = taosArrayGet(pBuildCtx->pHeadBlk->pBlk, pBuildCtx->pTbInfo->primCol->srcSlot);
pCtx->probeTs = (int64_t*)probeCol->pData;
pCtx->probeEndTs = (int64_t*)probeCol->pData + pCtx->probeRowNum - 1;
pCtx->buildTs = (int64_t*)buildCol->pData;
pCtx->buildEndTs = (int64_t*)buildCol->pData + pCtx->buildRowNum - 1;
}
static bool mJoinMoveToNextProbeTable(SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pProbeCtx) {
if (NULL == pProbeCtx->pCurrBlk->pNext) {
pProbeCtx->blkIdx++;
return false;
}
pProbeCtx->pCurrBlk = pProbeCtx->pCurrBlk->pNext;
pCtx->probeRowNum = pProbeCtx->pCurrBlk->pBlk->info.rows;
SColumnInfoData* probeCol = taosArrayGet(pProbeCtx->pCurrBlk->pBlk, pProbeCtx->pTbInfo->primCol->srcSlot);
pCtx->probeTs = (int64_t*)probeCol->pData;
pCtx->probeEndTs = (int64_t*)probeCol->pData + pCtx->probeRowNum - 1;
pProbeCtx->blkRowIdx = 0;
return true;
}
static bool mJoinMoveToNextBuildTable(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx* pCtx, SMJoinTableCtx* pBuildCtx, SMJoinTableCtx* pProbeCtx) {
bool contLoop = false;
bool res = false;
do {
if (pBuildCtx->pCurrBlk->pNext) {
pBuildCtx->blkIdx++;
return false;
}
pBuildCtx->pCurrBlk = pBuildCtx->pCurrBlk->pNext;
pCtx->buildRowNum = pBuildCtx->pCurrBlk->pBlk->info.rows;
SColumnInfoData* buildCol = taosArrayGet(pBuildCtx->pCurrBlk->pBlk, pBuildCtx->pTbInfo->primCol->srcSlot);
pCtx->buildTs = (int64_t*)buildCol->pData;
pCtx->buildEndTs = (int64_t*)buildCol->pData + pCtx->buildRowNum - 1;
pBuildCtx->blkRowIdx = 0;
do {
if (*pCtx->buildTs > pCtx->probeTs[pProbeCtx->blkRowIdx]) {
mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx);
contLoop = mJoinMoveToNextProbeTable(pCtx, pProbeCtx);
} else if (pCtx->probeTs[pProbeCtx->blkRowIdx] > *pCtx->buildEndTs) {
break;
} else {
contLoop = false;
res = true;
}
} while (contLoop);
} while (contLoop);
return res;
}
static void mJoinLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinTsJoinCtx* pCtx = &pJoin->ctx.mergeCtx.tsJoinCtx;
SMJoinTableCtx* pProbeCtx = &pJoin->ctx.mergeCtx.probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pJoin->ctx.mergeCtx.buildTbCtx;
mJoinLefeJoinUpdateTsJoinCtx(pCtx, pProbeCtx, pBuildCtx);
bool nextRow = false;
for (; pProbeCtx->blkIdx < pProbeCtx->blkNum; mJoinMoveToNextProbeTable(pCtx, pProbeCtx)) {
if (*pCtx->buildTs > *pCtx->probeEndTs) {
mJoinLeftJoinAddBlkToGrp(pJoin, pCtx, pProbeCtx, pBuildCtx);
continue;
} else if (*pCtx->probeTs > *pCtx->buildEndTs) {
if (!mJoinMoveToNextBuildTable(pJoin, pCtx, pBuildCtx, pProbeCtx)) {
break;
//retrieve build
}
}
if (leftTs == rightTs) {
mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
} else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
pJoinInfo->leftPos += 1;
for (; pProbeCtx->blkRowIdx < pCtx->probeRowNum; ++pProbeCtx->blkRowIdx) {
for (; pBuildCtx->blkIdx < pBuildCtx->blkNum; mJoinMoveToNextBuildTable(pJoin, pCtx, pBuildCtx, pProbeCtx)) {
for (; pBuildCtx->blkRowIdx < pCtx->buildRowNum; ++pBuildCtx->blkRowIdx) {
if (pCtx->probeTs[pProbeCtx->blkRowIdx] > pCtx->buildTs[pBuildCtx->blkRowIdx]) {
FIN_SAME_TS_GRP();
FIN_DIFF_TS_GRP();
continue;
} else if (pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->buildTs[pBuildCtx->blkRowIdx]) {
FIN_DIFF_TS_GRP();
if (pCtx->inSameTsGrp) {
pCtx->currGrpPairCtx.buildGrp.grpRowNum++;
} else {
pCtx->inSameTsGrp = true;
pCtx->currGrpPairCtx.buildGrp.grpRowBeginIdx = pBuildCtx->blkRowIdx;
}
} else {
FIN_SAME_TS_GRP();
if (pCtx->inDiffTsGrp) {
pCtx->currGrpPairCtx.probeGrp.grpRowNum++;
} else {
pCtx->inDiffTsGrp = true;
pCtx->currGrpPairCtx.probeGrp.grpRowBeginIdx = pProbeCtx->blkRowIdx;
}
nextRow = true;
break;
}
}
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
// end of single build table
if (nextRow) {
break;
}
}
// end of all build tables
if (nextRow) {
continue;
}
} else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
pJoinInfo->rightPos += 1;
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
if (pCtx->inSameTsGrp) {
PAUSE_SAME_TS_GRP();
}
}
// the pDataBlock are always the same one, no need to call this again
pRes->info.rows = nrows;
pRes->info.dataLoad = 1;
pRes->info.scanFlag = MAIN_SCAN;
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
// end of single probe table
if (nextRow) {
continue;
}
break;
}
// end of all probe tables
FIN_DIFF_TS_GRP();
pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_OUTPUT;
return true;
}
static bool mJoinLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
}
static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
bool contLoop = false;
do {
switch (pJoin->ctx.mergeCtx.joinPhase) {
case E_JOIN_PHASE_RETRIEVE:
contLoop = mJoinLeftJoinRetrieve(pOperator, pJoin);
break;
case E_JOIN_PHASE_SPLIT:
contLoop = mJoinLeftJoinSplitGrp(pOperator, pJoin);
break;
case E_JOIN_PHASE_OUTPUT:
contLoop = mJoinLeftJoinOutput(pOperator, pJoin);
break;
}
} while (contLoop);
}
static SSDataBlock* mJoinHanleMergeJoin(SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx;
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = NULL;
while (true) {
pBlock = (*pJoin->joinFp)(pOperator, pJoin, pCtx, pBuildCtx, pProbeCtx);
if (pBlock && pBlock->info.rows > 0) {
return pBlock;
}
}
}
SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
SMJoinOperatorInfo* pJoin = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) {
qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoinInfo->resRows);
qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoin->resRows);
return NULL;
} else {
resetMergeJoinOperator(pOperator);
@ -329,20 +552,18 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
st = taosGetTimestampUs();
}
SSDataBlock* pRes = pJoinInfo->pRes;
blockDataCleanup(pRes);
SSDataBlock* pBlock = NULL;
//blockDataCleanup(pJoin->pRes);
while (true) {
int32_t numOfRowsBefore = pRes->info.rows;
mJoinImpl(pOperator, pRes);
int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore;
if (numOfNewRows == 0) {
pBlock = (*pJoin->joinFp)(pOperator);
if (NULL == pBlock) {
break;
}
if (pJoinInfo->pFinFilter != NULL) {
doFilter(pRes, pJoinInfo->pFinFilter, NULL);
if (pJoin->pFinFilter != NULL) {
doFilter(pBlock, pJoin->pFinFilter, NULL);
}
if (pRes->info.rows > 0 || pOperator->status == OP_EXEC_DONE) {
if (pBlock->info.rows > 0 || pOperator->status == OP_EXEC_DONE) {
break;
}
}
@ -351,12 +572,12 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
if (pRes->info.rows > 0) {
pJoinInfo->resRows += pRes->info.rows;
qDebug("%s merge join returns res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pRes->info.rows);
return pRes;
if (pBlock->info.rows > 0) {
pJoin->resRows += pBlock->info.rows;
qDebug("%s merge join returns res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.rows);
return pBlock;
} else {
qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoinInfo->resRows);
qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoin->resRows);
return NULL;
}
}
@ -374,6 +595,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error;
}
pInfo->pOperator = pOperator;
code = mJoinInitDownstreamInfo(pInfo, pDownstream, numOfDownstream, newDownstreams);
if (TSDB_CODE_SUCCESS != code) {
goto _error;
@ -391,6 +613,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
mJoinSetBuildAndProbeTable(pInfo, pJoinNode);
mJoinInitJoinCtx(pInfo);
code = mJoinBuildResColMap(pInfo, pJoinNode);
if (code) {
goto _error;