enh: left join

This commit is contained in:
dapan1121 2023-12-08 15:27:22 +08:00
parent d7535f832a
commit 7fe919ad16
2 changed files with 164 additions and 119 deletions

View File

@ -53,25 +53,9 @@ typedef struct SMJoinColInfo {
char* bitMap;
} SMJoinColInfo;
typedef struct SMJoinCartGrp {
bool sameTsGrp;
bool firstArrIdx;
bool secondArrIdx;
int32_t firstRowIdx;
int32_t firstRowNum;
int32_t secondRowIdx;
int32_t secondRowNum;
} SMJoinCartGrp;
typedef struct SMJoinCartBlk {
SSDataBlock* pFirstBlk;
SSDataBlock* pSecondBlk;
SArray* pBlkGrps;
} SMJoinCartBlk;
typedef struct SMJoinCartCtx {
bool appendRes;
bool firstOnly;
int32_t resThreshold;
SSDataBlock* pResBlk;
@ -80,12 +64,17 @@ typedef struct SMJoinCartCtx {
int32_t secondColNum;
SMJoinColMap* pSecondCols;
SArray* pCartRowIdx;
SArray* pCartBlks;
SMJoinBlkInfo* pFirstBlk;
SMJoinBlkInfo* pSecondBlk;
int32_t firstRowIdx;
int32_t firstRowNum;
int32_t secondRowIdx;
int32_t secondRowNum;
} SMJoinCartCtx;
typedef struct SMJoinBlkInfo {
bool cloned;
bool inUse;
SSDataBlock* pBlk;
void* pNext;
} SMJoinBlkInfo;
@ -142,41 +131,44 @@ typedef struct SMJoinTsJoinCtx {
} SMJoinTsJoinCtx;
typedef struct SBuildGrpResIn {
bool multiBlkGrp;
SMJoinBlkInfo* grpBeginBlk;
int32_t grpRowBeginIdx;
int32_t grpRowNum;
bool multiBlk;
SMJoinBlkInfo* pBeginBlk;
int32_t rowBeginIdx;
int32_t rowNum;
} SBuildGrpResIn;
typedef struct SBuildGrpResOut {
SSHashObj* pGrpHash;
int32_t grpRowReadIdx;
int32_t grpRowGReadIdx;
SSHashObj* pHash;
SMJoinBlkInfo* pCurrBlk;
int32_t rowReadIdx;
int32_t rowGReadNum;
} SBuildGrpResOut;
typedef struct SProbeGrpResIn {
bool allRowsGrp;
SMJoinBlkInfo* grpBeginBlk;
int32_t grpRowBeginIdx;
int32_t grpRowNum;
SMJoinBlkInfo* pBeginBlk;
int32_t rowBeginIdx;
int32_t rowNum;
int64_t grpLastTs;
} SProbeGrpResIn;
typedef struct SProbeGrpResOut {
int32_t grpRowReadIdx;
SMJoinBlkInfo* pCurrBlk;
int32_t rowReadIdx;
int32_t rowGReadNum;
} SProbeGrpResOut;
typedef struct SGrpPairRes {
bool sameTsGrp;
bool finishGrp;
bool hashJoin;
SProbeGrpResIn probeIn;
SBuildGrpResIn buildIn;
SProbeGrpResIn prbIn;
SBuildGrpResIn bldIn;
/* KEEP THIS PART AT THE END */
bool outBegin;
SBuildGrpResOut buildOut;
SProbeGrpResOut probeOut;
SBuildGrpResOut bldOut;
SProbeGrpResOut prbOut;
/* KEEP THIS PART AT THE END */
} SGrpPairRes;
@ -269,10 +261,12 @@ typedef struct SMJoinOperatorInfo {
#define START_NEW_GRP(_ctx) memset(&(_ctx)->currGrpPair, 0, GRP_PAIR_INIT_SIZE)
#define REACH_HJOIN_THRESHOLD(_pair) ((_pair)->buildIn.grpRowNum * (_pair)->probeIn.grpRowNum > MJOIN_HJOIN_CART_THRESHOLD)
#define REACH_HJOIN_THRESHOLD(_pair) ((_pair)->buildIn.rowNum * (_pair)->probeIn.rowNum > MJOIN_HJOIN_CART_THRESHOLD)
#define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair))
#define BUILD_TB_BROKEN_BLK(_sg, _out, _in) ((_sg) && (((_out)->pCurrBlk == (_in)->pBeginBlk && (_out)->rowReadIdx != (_in)->rowBeginIdx) || ((_out)->pCurrBlk != (_in)->pBeginBlk && (_out)->rowReadIdx != 0)))
#define FIN_SAME_TS_GRP(_ctx, _octx, _done) do { \
if ((_ctx)->inSameTsGrp) { \
(_ctx)->currGrpPair.sameTsGrp = true; \
@ -298,7 +292,8 @@ typedef struct SMJoinOperatorInfo {
} \
} while (0)
#define CURRENT_BLK_GRP_ROWS(_in) (((_in)->grpRowNum + (_in)->grpRowBeginIdx) <= (_in)->grpBeginBlk->pBlk->info.rows ? (_in)->grpRowNum : ((_in)->grpBeginBlk->pBlk->info.rows - (_in)->grpRowBeginIdx))
#define PRB_CUR_BLK_GRP_ROWS(_rn, _rb, _bn) (((_rn) + (_rb)) <= (_bn) ? (_rn) : ((_bn) - (_rb)))
#define BLD_CUR_BLK_GRP_ROWS(_sg, _rn, _rb, _bn) ((_sg) ? 1 : (((_rn) + (_rb)) <= (_bn) ? (_rn) : ((_bn) - (_rb))))
#ifdef __cplusplus

View File

@ -320,15 +320,6 @@ static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* p
pCtx->outputCtx.cartCtx.secondColNum = pBuildCtx->pTbInfo->finNum;
pCtx->outputCtx.cartCtx.pSecondCols = pBuildCtx->pTbInfo->finCols;
pCtx->outputCtx.cartCtx.pCartRowIdx = taosArrayInit(MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM, sizeof(int32_t));
if (NULL == pCtx->outputCtx.cartCtx.pCartRowIdx) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCtx->outputCtx.cartCtx.pCartGrps = taosArrayInit(MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM, sizeof(SMJoinCartGrp));
if (NULL == pCtx->outputCtx.cartCtx.pCartGrps) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
@ -358,7 +349,10 @@ static int32_t mJoinAddBlkToList(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pCtx
if (NULL == pNew) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pNew->pBlk = pBlock;
pNew->inUse = true;
if (NULL == pCtx->pTailBlk) {
pCtx->pTailBlk = pCtx->pHeadBlk = pNew;
pCtx->pCurrBlk = pCtx->pHeadBlk;
@ -493,13 +487,13 @@ static void mJoinLeftJoinAddBlkToGrp(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx*
int32_t rowNum = pProbeCtx->pCurrBlk->pBlk->info.rows - pProbeCtx->blkRowIdx;
if (pCtx->nextProbeRow && pCtx->inDiffTsGrp) {
pCtx->currGrpPair->probeIn.grpRowNum += rowNum;
pCtx->currGrpPair->prbIn.rowNum += rowNum;
} else {
pCtx->inDiffTsGrp = true;
START_NEW_GRP(pCtx);
pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair->probeIn.grpRowNum = rowNum;
pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair->prbIn.rowNum = rowNum;
}
pCtx->nextProbeRow = true;
@ -531,6 +525,7 @@ static bool mJoinBuildMoveToNextBlk(SMJoinOperatorInfo* pJoin, SMJoinTsJoinCtx*
contLoop = mJoinProbeMoveToNextBlk(pCtx, pProbeCtx);
} else if (pCtx->probeTs[pProbeCtx->blkRowIdx] > *pCtx->buildEndTs) {
contLoop = true;
pBuildCtx->pCurrBlk->inUse = false;
break;
} else {
contLoop = false;
@ -563,8 +558,8 @@ static bool mLeftJoinSplitGrpImpl(SOperatorInfo* pOperator, SMJoinOperatorInfo*
for (; pProbeCtx->blkRowIdx < pCtx->probeRowNum; ++pProbeCtx->blkRowIdx) {
if (pCtx->pLastGrpPair && pCtx->pLastGrpPair->sameTsGrp
&& pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->pLastGrpPair->probeIn.grpLastTs) {
pCtx->pLastGrpPair->probeIn.grpRowNum++;
&& pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->pLastGrpPair->prbIn.grpLastTs) {
pCtx->pLastGrpPair->prbIn.rowNum++;
SET_SAME_TS_GRP_HJOIN(pCtx->pLastGrpPair, pOutCtx);
continue;
}
@ -578,29 +573,29 @@ static bool mLeftJoinSplitGrpImpl(SOperatorInfo* pOperator, SMJoinOperatorInfo*
} else if (pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->buildTs[pBuildCtx->blkRowIdx]) {
FIN_DIFF_TS_GRP(pCtx, pOutCtx, false);
if (pCtx->inSameTsGrp) {
pCtx->currGrpPair.buildIn.grpRowNum++;
pCtx->currGrpPair.bldIn.rowNum++;
} else {
pCtx->inSameTsGrp = true;
START_NEW_GRP(pCtx);
pCtx->currGrpPair.buildIn.grpBeginBlk = pBuildCtx->pCurrBlk;
pCtx->currGrpPair.buildIn.grpRowBeginIdx = pBuildCtx->blkRowIdx;
pCtx->currGrpPair.buildIn.grpRowNum = 1;
pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.probeIn.grpLastTs = pCtx->probeTs[pProbeCtx->blkRowIdx];
pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair.probeIn.grpRowNum = 1;
pCtx->currGrpPair.bldIn.pBeginBlk = pBuildCtx->pCurrBlk;
pCtx->currGrpPair.bldIn.rowBeginIdx = pBuildCtx->blkRowIdx;
pCtx->currGrpPair.bldIn.rowNum = 1;
pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.prbIn.grpLastTs = pCtx->probeTs[pProbeCtx->blkRowIdx];
pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair.prbIn.rowNum = 1;
}
pCtx->nextProbeRow = false;
} else {
FIN_SAME_TS_GRP(pCtx, pOutCtx, true);
if (pCtx->inDiffTsGrp) {
pCtx->currGrpPair.probeIn.grpRowNum++;
pCtx->currGrpPair.prbIn.rowNum++;
} else {
pCtx->inDiffTsGrp = true;
START_NEW_GRP(pCtx);
pCtx->currGrpPair.probeIn.grpBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.probeIn.grpRowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair.probeIn.grpRowNum = 1;
pCtx->currGrpPair.prbIn.pBeginBlk = pProbeCtx->pCurrBlk;
pCtx->currGrpPair.prbIn.rowBeginIdx = pProbeCtx->blkRowIdx;
pCtx->currGrpPair.prbIn.rowNum = 1;
}
pCtx->nextProbeRow = true;
break;
@ -654,7 +649,8 @@ static bool mLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi
if (0 == pJoin->ctx.mergeCtx.buildTbCtx.blkNum) {
ASSERTS(pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone, "left join empty build table while fetch not done");
FIN_SAME_TS_GRP(pCtx, &pJoin->ctx.mergeCtx.outputCtx, true);
FIN_DIFF_TS_GRP(pCtx, &pJoin->ctx.mergeCtx.outputCtx, pJoin->ctx.mergeCtx.buildTbCtx.dsFetchDone);
} else {
mLeftJoinSplitGrpImpl(pOperator, pJoin);
@ -670,7 +666,7 @@ static void mLeftJoinCart(SMJoinCartCtx* pCtx) {
for (int32_t c = 0; c < pCtx->firstColNum; ++c) {
SMJoinColMap* pFirstCol = pCtx->pFirstCols + c;
SColumnInfoData* pInCol = taosArrayGet(pCtx->pFirstBlk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pInCol = taosArrayGet(pCtx->pFirstBlk->pBlk, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pFirstCol->dstSlot);
for (int32_t r = 0; r < pCtx->firstRowNum; ++r) {
if (colDataIsNull_s(pInCol, pCtx->firstRowIdx + r)) {
@ -681,33 +677,44 @@ static void mLeftJoinCart(SMJoinCartCtx* pCtx) {
}
}
for (int32_t c = 0; c < pCtx->secondColNum; ++c) {
SMJoinColMap* pSecondCol = pCtx->pSecondCols + c;
SColumnInfoData* pInCol = taosArrayGet(pCtx->pSecondBlk->pDataBlock, pSecondCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < pCtx->firstRowNum; ++r) {
colDataAssignNRows(pOutCol, currRows + r * pCtx->secondRowNum, pInCol, pCtx->secondRowIdx, pCtx->secondRowNum);
if (pCtx->firstOnly) {
ASSERT(1 == pCtx->secondRowNum);
for (int32_t c = 0; c < pCtx->secondColNum; ++c) {
SMJoinColMap* pSecondCol = pCtx->pSecondCols + c;
SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot);
colDataSetNItemsNull(pOutCol, currRows, pCtx->firstRowNum);
}
} else {
for (int32_t c = 0; c < pCtx->secondColNum; ++c) {
SMJoinColMap* pSecondCol = pCtx->pSecondCols + c;
SColumnInfoData* pInCol = taosArrayGet(pCtx->pSecondBlk->pBlk, pSecondCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < pCtx->firstRowNum; ++r) {
colDataAssignNRows(pOutCol, currRows + r * pCtx->secondRowNum, pInCol, pCtx->secondRowIdx, pCtx->secondRowNum);
}
}
}
pCtx->pResBlk.info.rows += pCtx->firstRowNum * pCtx->secondRowNum;
}
static bool mLeftJoinSameTsOutput(SMJoinOperatorInfo* pJoin, SGrpPairRes* pPair) {
SProbeGrpResIn* pProbeIn = &pPair->probeIn;
SBuildGrpResIn* pBuildIn = &pPair->buildIn;
SProbeGrpResIn* pProbeIn = &pPair->prbIn;
SBuildGrpResIn* pBuildIn = &pPair->prbIn;
if (!pPair->outBegin) {
}
SMJoinBlkInfo* pBInfo = pProbeIn->grpBeginBlk;
SMJoinBlkInfo* pBInfo = pProbeIn->pBeginBlk;
do {
if (pJoin->prevFilter) {
} else {
if (pPair->buildOut.hashJoin) {
if (pPair->bldOut.hashJoin) {
} else {
for (; pProbeIn->grpRowBeginIdx < pBInfo->pBlk->info.rows && pProbeIn->grpRowNum > 0; pProbeIn->grpRowBeginIdx++, pProbeIn->grpRowNum--) {
for (; pProbeIn->rowBeginIdx < pBInfo->pBlk->info.rows && pProbeIn->rowNum > 0; pProbeIn->rowBeginIdx++, pProbeIn->rowNum--) {
}
}
@ -719,64 +726,107 @@ static bool mLeftJoinCartOutput(SMJoinOperatorInfo* pJoin, SMJoinOutputCtx* pCtx
bool contLoop = false;
SMJoinCartCtx* pCart = &pCtx->cartCtx;
int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList);
int32_t rowsLeft = pCart->pResBlk pCart->pResBlk->info.rows;
int32_t rowsLeft = pCart->pResBlk->info.capacity - pCart->pResBlk->info.rows;
int32_t probeRows = 0;
int32_t buildRows = 0;
bool grpDone = false, brokenBlk = false;
for (; pCtx->grpReadIdx < grpNum; pCtx->grpReadIdx++) {
SGrpPairRes* pPair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx);
if (!pPair->finishGrp) {
for (; pCtx->grpReadIdx < grpNum && pCart->pResBlk->info.rows <= pCart->resThreshold; pCtx->grpReadIdx++) {
SGrpPairRes* pair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx);
if (!pair->finishGrp) {
ASSERTS(pCtx->grpReadIdx == grpNum - 1, "unfinished grp not the last");
taosArrayRemoveBatch(pCtx->pGrpResList, 0, pCtx->grpReadIdx - 1, NULL);
pCtx->grpReadIdx = 0;
break;
}
if (pPair->hashJoin) {
contLoop = mLeftJoinHashOutput(pJoin, pPair);
} else if (pCtx->cartCtx.appendRes) {
contLoop = mLeftJoinDirectOutput(pJoin, pPair);
}
grpDone = false;
pCart->firstOnly = !pair->sameTsGrp;
do {
if (!pair->outBegin) {
probeRows = PRB_CUR_BLK_GRP_ROWS(pair->prbIn.rowNum, pair->prbIn.rowBeginIdx, pair->prbIn.pBeginBlk->pBlk->info.rows);
buildRows = BLD_CUR_BLK_GRP_ROWS(pair->sameTsGrp, pair->bldIn.rowNum, pair->bldIn.rowBeginIdx, pair->bldIn.pBeginBlk->pBlk->info.rows);
pCart->pFirstBlk = pair->prbIn.pBeginBlk;
pCart->firstRowIdx = pair->prbIn.rowBeginIdx;
pair->prbOut.pCurrBlk = pair->prbIn.pBeginBlk;
if (pair->sameTsGrp) {
pCart->pSecondBlk = pair->bldIn.pBeginBlk;
pCart->secondRowIdx = pair->bldIn.rowBeginIdx;
pair->bldOut.pCurrBlk = pair->bldIn.pBeginBlk;
}
pair->outBegin = true;
brokenBlk = false;
} else if (BUILD_TB_BROKEN_BLK(pair->sameTsGrp, &pair->bldOut, &pair->bldIn)) {
probeRows = PRB_CUR_BLK_GRP_ROWS(1, pair->prbOut.rowReadIdx, pair->prbOut.pCurrBlk->pBlk.info.rows);
buildRows = BLD_CUR_BLK_GRP_ROWS(pair->bldIn.rowNum - pair->bldOut.rowGReadNum, pair->bldOut.rowReadIdx, pair->bldOut.pCurrBlk->pBlk.info.rows);
pCart->firstRowIdx = pair->prbOut.rowReadIdx;
pCart->secondRowIdx = pair->bldOut.rowReadIdx;
brokenBlk = true;
} else {
probeRows = PRB_CUR_BLK_GRP_ROWS(pair->prbIn.rowNum - pair->prbOut.rowGReadNum, pair->prbOut.rowReadIdx, pair->prbOut.pCurrBlk->pBlk.info.rows);
buildRows = BLD_CUR_BLK_GRP_ROWS(pair->bldIn.rowNum - pair->bldOut.rowGReadNum, pair->bldOut.rowReadIdx, pair->bldOut.pCurrBlk->pBlk.info.rows);
pCart->firstRowIdx = pair->prbOut.rowReadIdx;
if (pair->sameTsGrp) {
pCart->secondRowIdx = pair->bldOut.rowReadIdx;
}
brokenBlk = false;
}
if (!contLoop) {
return false;
}
}
}
int64_t reqNum = probeRows * buildRows;
if (reqNum <= rowsLeft) {
pCart->firstRowNum = probeRows;
pCart->secondRowNum = buildRows;
pair->prbOut.rowGReadNum += probeRows;
if (!brokenBlk) {
pair->prbOut.rowReadIdx = 0;
static bool mLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinOutputCtx* pCtx = &pJoin->ctx.mergeCtx.outputCtx;
bool contLoop = false;
int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList);
if (pair->sameTsGrp) {
pair->bldOut.rowGReadNum += buildRows;
pair->bldOut.rowReadIdx = 0;
}
} else {
pair->prbOut.rowReadIdx += probeRows;
}
if (pCtx->cartCtx.appendRes) {
return mLeftJoinCartOutput(pJoin, pCtx);
}
for (; pCtx->grpReadIdx < grpNum; pCtx->grpReadIdx++) {
SGrpPairRes* pPair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx);
if (!pPair->finishGrp) {
ASSERTS(pCtx->grpReadIdx == grpNum - 1, "unfinished grp not the last");
taosArrayRemoveBatch(pCtx->pGrpResList, 0, pCtx->grpReadIdx - 1, NULL);
pCtx->grpReadIdx = 0;
if (pair->prbOut.rowGReadNum >= pair->prbIn.rowNum) {
grpDone = true;
}
rowsLeft -= reqNum;
} else if (buildRows <= rowsLeft) {
pCart->firstRowNum = brokenBlk ? 1 : (rowsLeft / buildRows);
pair->prbOut.rowGReadNum += pCart->firstRowNum;
pair->prbOut.rowReadIdx = pCart->firstRowIdx + pCart->firstRowNum;
pCart->secondRowNum = buildRows;
pair->bldOut.rowReadIdx = 0;
rowsLeft -= (pCart->firstRowNum * pCart->secondRowNum);
} else {
ASSERT(pair->sameTsGrp);
pCart->firstRowNum = 1;
pCart->secondRowNum = rowsLeft;
pair->bldOut.rowReadIdx = pCart->secondRowIdx + rowsLeft;
rowsLeft = 0;
}
mLeftJoinCart(pCart);
}while ((!grpDone) && pCart->pResBlk->info.rows <= pCart->resThreshold);
if (!grpDone) {
break;
}
if (pPair->hashJoin) {
contLoop = mLeftJoinHashOutput(pJoin, pPair);
} else if (pCtx->cartCtx.appendRes) {
contLoop = mLeftJoinDirectOutput(pJoin, pPair);
}
if (!contLoop) {
return false;
}
}
pJoin->ctx.mergeCtx.joinPhase = E_JOIN_PHASE_RETRIEVE;
return true;
}
static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
bool contLoop = false;