From 194b1f03ae4e4f1dcc43c1399f8490dba7466a5e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 26 Dec 2023 19:28:19 +0800 Subject: [PATCH] enh: support inner join --- include/libs/nodes/querynodes.h | 1 + source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/inc/mergejoin.h | 31 +- source/libs/executor/src/mergejoin.c | 954 ++++++++++--------- source/libs/executor/src/mergejoinoperator.c | 350 ++++++- source/libs/nodes/src/nodesUtilFuncs.c | 48 + source/libs/planner/src/planOptimizer.c | 67 +- source/libs/planner/src/planPhysiCreater.c | 47 +- tests/script/tsim/join/left_join.sim | 66 +- 9 files changed, 1033 insertions(+), 532 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 33a3b6aba7..890064812d 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -599,6 +599,7 @@ bool nodesIsTableStar(SNode* pNode); char* getJoinTypeString(EJoinType type); char* getJoinSTypeString(EJoinSubType type); char* getFullJoinTypeString(EJoinType type, EJoinSubType stype); +int32_t mergeJoinConds(SNode** ppDst, SNode** ppSrc); #ifdef __cplusplus diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e3e504cdbc..b8a83959f3 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -743,6 +743,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* de extern void doDestroyExchangeOperatorInfo(void* param); +int32_t doFilterImpl(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo, SColumnInfoData** pResCol); int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo* pCache); diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 4814986980..2fc2cd3c02 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -22,10 +22,12 @@ extern "C" { #define MJOIN_DEFAULT_BLK_ROWS_NUM 4096 #define MJOIN_HJOIN_CART_THRESHOLD 16 #define MJOIN_BLK_SIZE_LIMIT 10485760 +#define MJOIN_ROW_BITMAP_SIZE (2 * 1048576) struct SMJoinOperatorInfo; typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*); +typedef int32_t (*joinCartFp)(void*); typedef enum EJoinTableType { E_JOIN_TB_BUILD = 1, @@ -33,6 +35,7 @@ typedef enum EJoinTableType { } EJoinTableType; #define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE") +#define IS_FULL_OUTER_JOIN(_jtype, _stype) ((_jtype) == JOIN_TYPE_FULL && (_stype) == JOIN_STYPE_OUTER) typedef struct SMJoinRowPos { SSDataBlock* pBlk; @@ -97,6 +100,10 @@ typedef struct SMJoinTableCtx { int32_t grpRowIdx; SArray* pHashCurGrp; SSHashObj* pGrpHash; + + int64_t rowBitmapSize; + int64_t rowBitmapOffset; + char* pRowBitmap; } SMJoinTableCtx; typedef struct SMJoinGrpRows { @@ -104,22 +111,29 @@ typedef struct SMJoinGrpRows { int32_t beginIdx; int32_t endIdx; int32_t readIdx; + int32_t rowBitmapOffset; + int32_t rowMatchNum; bool readMatch; } SMJoinGrpRows; typedef struct SMJoinMergeCtx { struct SMJoinOperatorInfo* pJoin; + bool ascTs; bool hashCan; bool keepOrder; bool grpRemains; bool midRemains; bool lastEqGrp; + bool lastProbeGrp; int32_t blkThreshold; SSDataBlock* midBlk; SSDataBlock* finBlk; int64_t lastEqTs; SMJoinGrpRows probeNEqGrp; + SMJoinGrpRows buildNEqGrp; bool hashJoin; + joinCartFp hashCartFp; + joinCartFp mergeCartFp; } SMJoinMergeCtx; typedef struct SMJoinWinCtx { @@ -178,8 +192,8 @@ typedef struct SMJoinOperatorInfo { #define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair)) -#define LEFT_JOIN_NO_EQUAL(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts)) -#define LEFT_JOIN_DISCRAD(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts)) +#define PROBE_TS_UNREACH(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts)) +#define PROBE_TS_OVER(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts)) #define GRP_REMAIN_ROWS(_grp) ((_grp)->endIdx - (_grp)->readIdx + 1) #define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx) @@ -225,19 +239,22 @@ typedef struct SMJoinOperatorInfo { } while (0) - - int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode); SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator); bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb); void mJoinSetDone(SOperatorInfo* pOperator); bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen); -void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart); +int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart); int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp); int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable); int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable); - - +int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp); +bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build); +int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond); +int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx); +int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp); +int32_t mJoinNonEqCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp); +int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin); #ifdef __cplusplus } diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index d1f40e3c72..793f7ffccd 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -27,88 +27,59 @@ #include "ttypes.h" #include "mergejoin.h" -int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { - SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; +static int32_t mOuterJoinHashFullCart(SMJoinMergeCtx* pCtx) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); - pCtx->pJoin = pJoin; - pCtx->lastEqTs = INT64_MIN; - pCtx->hashCan = pJoin->probe->keyNum > 0; - - pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize)); - - if (pJoin->pFPreFilter) { - pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false); - blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity); - } - - pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.5; - - return TSDB_CODE_SUCCESS; -} - -static int32_t mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp) { - SMJoinTableCtx* probe = pJoin->probe; - SMJoinTableCtx* build = pJoin->build; - int32_t currRows = append ? pRes->info.rows : 0; - int32_t firstRows = GRP_REMAIN_ROWS(pGrp); - - for (int32_t c = 0; c < probe->finNum; ++c) { - SMJoinColMap* pFirstCol = probe->finCols + c; - SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot); - SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); - colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows); - } - - for (int32_t c = 0; c < build->finNum; ++c) { - SMJoinColMap* pSecondCol = build->finCols + c; - SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot); - colDataSetNItemsNull(pOutCol, currRows, firstRows); - } - - pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows; - return TSDB_CODE_SUCCESS; -} - -static int32_t mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) { - SMJoinTableCtx* probe = pJoin->probe; - SMJoinTableCtx* build = pJoin->build; - int32_t currRows = append ? pRes->info.rows : 0; - int32_t firstRows = GRP_REMAIN_ROWS(pFirst); - int32_t secondRows = GRP_REMAIN_ROWS(pSecond); - ASSERT(secondRows > 0); - - for (int32_t c = 0; c < probe->finNum; ++c) { - SMJoinColMap* pFirstCol = probe->finCols + c; - SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot); - SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); - for (int32_t r = 0; r < firstRows; ++r) { - if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) { - colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows); - } else { - ASSERT(pRes->info.capacity >= (pRes->info.rows + firstRows * secondRows)); - uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type)) ? pOutCol->varmeta.length : ((currRows + r * secondRows) * pOutCol->info.bytes); - ASSERT((startOffset + 1 * pOutCol->info.bytes) <= pRes->info.capacity * pOutCol->info.bytes); - colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r), secondRows, true); - } + if (build->grpRowIdx >= 0) { + bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + if (build->grpRowIdx < 0) { + probeGrp->readIdx++; + } + + if (!contLoop) { + goto _return; } } - for (int32_t c = 0; c < build->finNum; ++c) { - SMJoinColMap* pSecondCol = build->finCols + c; - SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot); - SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot); - for (int32_t r = 0; r < firstRows; ++r) { - colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows); + size_t bufLen = 0; + int32_t probeEndIdx = probeGrp->endIdx; + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) { + if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); + probeGrp->endIdx = probeEndIdx; + continue; + } + + SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); + if (NULL == pGrp) { + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); + probeGrp->endIdx = probeEndIdx; + } else { + build->pHashCurGrp = *pGrp; + build->grpRowIdx = 0; + bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + if (!contLoop) { + if (build->grpRowIdx < 0) { + probeGrp->readIdx++; + } + goto _return; + } } } - pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows; +_return: + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + return TSDB_CODE_SUCCESS; } -static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) { +static int32_t mOuterJoinMergeFullCart(SMJoinMergeCtx* pCtx) { int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; @@ -122,7 +93,7 @@ static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) { if (pFirstBuild->readIdx == pFirstBuild->beginIdx) { for (; build->grpIdx < buildGrpNum; ++build->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); - MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); buildGrp->readIdx = buildGrp->beginIdx; } @@ -137,7 +108,7 @@ static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { - MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); rowsLeft -= GRP_REMAIN_ROWS(buildGrp); buildGrp->readIdx = buildGrp->beginIdx; continue; @@ -145,7 +116,7 @@ static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) { int32_t buildEndIdx = buildGrp->endIdx; buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; - mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp); buildGrp->readIdx += rowsLeft; buildGrp->endIdx = buildEndIdx; rowsLeft = 0; @@ -170,37 +141,7 @@ static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static int32_t mLeftJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) { - SSDataBlock* pLess = NULL; - SSDataBlock* pMore = NULL; - if ((*ppMid)->info.rows < (*ppFin)->info.rows) { - pLess = (*ppMid); - pMore = (*ppFin); - } else { - pLess = (*ppFin); - pMore = (*ppMid); - } - - int32_t totalRows = pMore->info.rows + pLess->info.rows; - if (totalRows <= pMore->info.capacity) { - MJ_ERR_RET(blockDataMerge(pMore, pLess)); - blockDataCleanup(pLess); - pCtx->midRemains = false; - } else { - int32_t copyRows = pMore->info.capacity - pMore->info.rows; - MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows)); - blockDataShrinkNRows(pLess, copyRows); - pCtx->midRemains = true; - } - - if (pMore != (*ppFin)) { - TSWAP(*ppMid, *ppFin); - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { +static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); @@ -221,7 +162,7 @@ static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { - MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); rowsLeft -= GRP_REMAIN_ROWS(buildGrp); buildGrp->readIdx = buildGrp->beginIdx; continue; @@ -230,7 +171,7 @@ static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { int32_t buildEndIdx = buildGrp->endIdx; buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; ASSERT(buildGrp->endIdx >= buildGrp->readIdx); - MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); buildGrp->readIdx += rowsLeft; buildGrp->endIdx = buildEndIdx; rowsLeft = 0; @@ -247,13 +188,13 @@ static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { if (0 == pCtx->midBlk->info.rows) { if (build->grpIdx == buildGrpNum) { if (!probeGrp->readMatch) { - MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); } continue; } } else { - MJ_ERR_RET(mLeftJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk)); + MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk)); if (pCtx->midRemains) { contLoop = false; @@ -285,39 +226,158 @@ static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static int32_t mLeftJoinMergeCart(SMJoinMergeCtx* pCtx) { - return (NULL == pCtx->pJoin->pFPreFilter) ? mLeftJoinMergeFullCart(pCtx) : mLeftJoinMergeSeqCart(pCtx); -} - -static int32_t mLeftJoinNonEqCart(SMJoinMergeCtx* pCtx) { - int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; - SMJoinGrpRows* probeGrp = &pCtx->probeNEqGrp; - if (rowsLeft <= 0) { - pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; +int32_t mJoinFilterMarkRowsMatch(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx) { + if (pFilterInfo == NULL || pBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; } - - int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); - pCtx->lastEqGrp = false; + SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; + SColumnInfoData* p = NULL; - if (probeRows <= rowsLeft) { - MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); - probeGrp->readIdx = probeGrp->endIdx + 1; - pCtx->grpRemains = false; - } else { - int32_t probeEndIdx = probeGrp->endIdx; - probeGrp->endIdx = probeGrp->readIdx + rowsLeft - 1; - MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); - probeGrp->readIdx = probeGrp->endIdx + 1; - probeGrp->endIdx = probeEndIdx; - pCtx->grpRemains = true; + int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); + if (code != TSDB_CODE_SUCCESS) { + goto _err; } + int32_t status = 0; + code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + extractQualifiedTupleByFilterResult(pBlock, p, status); + + code = TSDB_CODE_SUCCESS; + +_err: + colDataDestroy(p); + taosMemoryFree(p); + return code; +} + +static int32_t mOuterJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + int32_t startRowIdx = build->grpRowIdx; + + blockDataCleanup(pCtx->midBlk); + + do { + mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); + + if (pCtx->midBlk->info.rows > 0) { + if (build->rowBitmapSize > 0) { + MJ_ERR_RET(mJoinFilterMarkRowsMatch(pCtx->midBlk, pCtx->pJoin->pPreFilter, build, startRowIdx)); + } else { + MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pPreFilter, NULL)); + } + if (pCtx->midBlk->info.rows > 0) { + probeGrp->readMatch = true; + } + } + + if (0 == pCtx->midBlk->info.rows) { + if (build->grpRowIdx < 0) { + if (!probeGrp->readMatch) { + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); + } + + break; + } + + continue; + } else { + MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk)); + + if (pCtx->midRemains) { + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + *contLoop = false; + return TSDB_CODE_SUCCESS; + } + + if (build->grpRowIdx < 0) { + break; + } + + continue; + } + } while (true); + + *contLoop = true; + return TSDB_CODE_SUCCESS; +} + + +static int32_t mOuterJoinHashSeqCart(SMJoinMergeCtx* pCtx) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); + bool contLoop = false; + + if (build->grpRowIdx >= 0) { + MJ_ERR_RET(mOuterJoinHashGrpCartFilter(pCtx, &contLoop)); + if (build->grpRowIdx < 0) { + probeGrp->readIdx++; + probeGrp->readMatch = false; + } + + if (!contLoop) { + goto _return; + } + } + + size_t bufLen = 0; + int32_t probeEndIdx = probeGrp->endIdx; + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk);) { + if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); + probeGrp->endIdx = probeEndIdx; + probeGrp->readIdx++; + probeGrp->readMatch = false; + continue; + } + + SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); + if (NULL == pGrp) { + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true)); + probeGrp->endIdx = probeEndIdx; + probeGrp->readIdx++; + probeGrp->readMatch = false; + } else { + build->pHashCurGrp = *pGrp; + build->grpRowIdx = 0; + + probeGrp->endIdx = probeGrp->readIdx; + MJ_ERR_RET(mOuterJoinHashGrpCartFilter(pCtx, &contLoop)); + probeGrp->endIdx = probeEndIdx; + if (build->grpRowIdx < 0) { + probeGrp->readIdx++; + probeGrp->readMatch = false; + } + + if (!contLoop) { + break; + } + } + } + +_return: + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + return TSDB_CODE_SUCCESS; } +static int32_t mLeftJoinMergeCart(SMJoinMergeCtx* pCtx) { + return (NULL == pCtx->pJoin->pFPreFilter) ? mOuterJoinMergeFullCart(pCtx) : mOuterJoinMergeSeqCart(pCtx); +} + + + static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; @@ -347,274 +407,16 @@ static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi return true; } -static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build) { - int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity; - if (rowsLeft <= 0) { - return false; - } - - int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp); - int32_t grpRows = buildGrpRows - build->grpRowIdx; - if (grpRows <= 0 || build->grpRowIdx < 0) { - build->grpRowIdx = -1; - return true; - } - - int32_t actRows = TMIN(grpRows, rowsLeft); - int32_t currRows = append ? pBlk->info.rows : 0; - - for (int32_t c = 0; c < probe->finNum; ++c) { - SMJoinColMap* pFirstCol = probe->finCols + c; - SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot); - SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot); - if (colDataIsNull_s(pInCol, probeGrp->readIdx)) { - colDataSetNItemsNull(pOutCol, currRows, actRows); - } else { - colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, true); - } - } - - for (int32_t c = 0; c < build->finNum; ++c) { - SMJoinColMap* pSecondCol = build->finCols + c; - SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot); - for (int32_t r = 0; r < actRows; ++r) { - SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, build->grpRowIdx + r); - SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot); - colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1); - } - } - - pBlk->info.rows += actRows; - - if (actRows == grpRows) { - build->grpRowIdx = -1; - } else { - build->grpRowIdx += actRows; - } - - if (actRows == rowsLeft) { - return false; - } - - return true; -} - -static int32_t mLeftJoinHashFullCart(SMJoinMergeCtx* pCtx) { - SMJoinTableCtx* probe = pCtx->pJoin->probe; - SMJoinTableCtx* build = pCtx->pJoin->build; - SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); - - if (build->grpRowIdx >= 0) { - bool contLoop = mLeftJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); - if (build->grpRowIdx < 0) { - probeGrp->readIdx++; - } - - if (!contLoop) { - goto _return; - } - } - - size_t bufLen = 0; - int32_t probeEndIdx = probeGrp->endIdx; - for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) { - if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { - probeGrp->endIdx = probeGrp->readIdx; - MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); - probeGrp->endIdx = probeEndIdx; - continue; - } - - SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); - if (NULL == pGrp) { - probeGrp->endIdx = probeGrp->readIdx; - MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); - probeGrp->endIdx = probeEndIdx; - } else { - build->pHashCurGrp = *pGrp; - build->grpRowIdx = 0; - bool contLoop = mLeftJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); - if (!contLoop) { - if (build->grpRowIdx < 0) { - probeGrp->readIdx++; - } - goto _return; - } - } - } - -_return: - - pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; - - return TSDB_CODE_SUCCESS; -} - -static int32_t mLeftJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) { - SMJoinTableCtx* probe = pCtx->pJoin->probe; - SMJoinTableCtx* build = pCtx->pJoin->build; - SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); - - blockDataCleanup(pCtx->midBlk); - - do { - mLeftJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); - - if (pCtx->midBlk->info.rows > 0) { - MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pPreFilter, NULL)); - if (pCtx->midBlk->info.rows > 0) { - probeGrp->readMatch = true; - } - } - - if (0 == pCtx->midBlk->info.rows) { - if (build->grpRowIdx < 0) { - if (!probeGrp->readMatch) { - MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); - } - - break; - } - - continue; - } else { - MJ_ERR_RET(mLeftJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk)); - - if (pCtx->midRemains) { - pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; - *contLoop = false; - return TSDB_CODE_SUCCESS; - } - - if (build->grpRowIdx < 0) { - break; - } - - continue; - } - } while (true); - - *contLoop = true; - return TSDB_CODE_SUCCESS; -} - - -static int32_t mLeftJoinHashSeqCart(SMJoinMergeCtx* pCtx) { - SMJoinTableCtx* probe = pCtx->pJoin->probe; - SMJoinTableCtx* build = pCtx->pJoin->build; - SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); - bool contLoop = false; - - if (build->grpRowIdx >= 0) { - MJ_ERR_RET(mLeftJoinHashGrpCartFilter(pCtx, &contLoop)); - if (build->grpRowIdx < 0) { - probeGrp->readIdx++; - probeGrp->readMatch = false; - } - - if (!contLoop) { - goto _return; - } - } - - size_t bufLen = 0; - int32_t probeEndIdx = probeGrp->endIdx; - for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk);) { - if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { - probeGrp->endIdx = probeGrp->readIdx; - MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); - probeGrp->endIdx = probeEndIdx; - probeGrp->readIdx++; - probeGrp->readMatch = false; - continue; - } - - SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); - if (NULL == pGrp) { - probeGrp->endIdx = probeGrp->readIdx; - MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp)); - probeGrp->endIdx = probeEndIdx; - probeGrp->readIdx++; - probeGrp->readMatch = false; - } else { - build->pHashCurGrp = *pGrp; - build->grpRowIdx = 0; - - probeGrp->endIdx = probeGrp->readIdx; - MJ_ERR_RET(mLeftJoinHashGrpCartFilter(pCtx, &contLoop)); - probeGrp->endIdx = probeEndIdx; - if (build->grpRowIdx < 0) { - probeGrp->readIdx++; - probeGrp->readMatch = false; - } - - if (!contLoop) { - break; - } - } - } - -_return: - - pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; - - return TSDB_CODE_SUCCESS; -} - - static int32_t mLeftJoinHashCart(SMJoinMergeCtx* pCtx) { - return (NULL == pCtx->pJoin->pPreFilter) ? mLeftJoinHashFullCart(pCtx) : mLeftJoinHashSeqCart(pCtx); + return (NULL == pCtx->pJoin->pPreFilter) ? mOuterJoinHashFullCart(pCtx) : mOuterJoinHashSeqCart(pCtx); } -static int32_t mLeftJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { - SMJoinOperatorInfo* pJoin = pCtx->pJoin; - - pCtx->lastEqGrp = true; - - mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true); - if (!lastBuildGrp) { - mJoinRetrieveEqGrpRows(pJoin->pOperator, pJoin->build, timestamp); - } else { - pJoin->build->grpIdx = 0; - } - - if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) { - if (!lastBuildGrp || !pCtx->hashJoin) { - MJ_ERR_RET(mJoinMakeBuildTbHash(pJoin, pJoin->build)); - } - - if (pJoin->probe->newBlk) { - MJ_ERR_RET(mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe)); - pJoin->probe->newBlk = false; - } - - pCtx->hashJoin = true; - - return mLeftJoinHashCart(pCtx); - } - - pCtx->hashJoin = false; - - return mLeftJoinMergeCart(pCtx); -} - -static int32_t mLeftJoinHandleMidRemains(SMJoinMergeCtx* pCtx) { - ASSERT(0 < pCtx->midBlk->info.rows); - - TSWAP(pCtx->midBlk, pCtx->finBlk); - - pCtx->midRemains = false; - - return TSDB_CODE_SUCCESS; -} - - -static int32_t mLeftJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { +static FORCE_INLINE int32_t mLeftJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { if (pCtx->lastEqGrp) { - return (pCtx->hashJoin) ? mLeftJoinHashCart(pCtx) : mLeftJoinMergeCart(pCtx); + return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); } - return mLeftJoinNonEqCart(pCtx); + return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true); } SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { @@ -625,12 +427,11 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { int64_t buildTs = 0; SColumnInfoData* pBuildCol = NULL; SColumnInfoData* pProbeCol = NULL; - bool asc = (pJoin->inputTsOrder == TSDB_ORDER_ASC) ? true : false; blockDataCleanup(pCtx->finBlk); if (pCtx->midRemains) { - MJ_ERR_JRET(mLeftJoinHandleMidRemains(pCtx)); + MJ_ERR_JRET(mJoinHandleMidRemains(pCtx)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -654,7 +455,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); if (probeTs == pCtx->lastEqTs) { - MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, true)); + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -669,37 +470,22 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { if (probeTs == buildTs) { pCtx->lastEqTs = probeTs; - MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, false)); + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); - } else if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) { - pCtx->probeNEqGrp.blk = pJoin->probe->blk; - pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx; - pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; - pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx; - - while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { - MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); - if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) { - pCtx->probeNEqGrp.endIdx = pJoin->probe->blkRowIdx; - continue; - } - - break; - } - - MJ_ERR_JRET(mLeftJoinNonEqCart(pCtx)); + } else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { + MJ_ERR_JRET(mJoinProcessNonEqualGrp(pCtx, pProbeCol, true, &probeTs, &buildTs)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } } else { while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); - if (LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) { + if (PROBE_TS_OVER(pCtx->ascTs, probeTs, buildTs)) { continue; } @@ -716,7 +502,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; - MJ_ERR_JRET(mLeftJoinNonEqCart(pCtx)); + MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -733,6 +519,118 @@ _return: return pCtx->finBlk; } +static int32_t mInnerJoinMergeCart(SMJoinMergeCtx* pCtx) { + int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); + int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); + int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); + int32_t probeEndIdx = probeGrp->endIdx; + + if (0 == build->grpIdx && probeRows * build->grpTotalRows <= rowsLeft) { + SMJoinGrpRows* pFirstBuild = taosArrayGet(build->eqGrps, 0); + if (pFirstBuild->readIdx == pFirstBuild->beginIdx) { + for (; build->grpIdx < buildGrpNum; ++build->grpIdx) { + SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + buildGrp->readIdx = buildGrp->beginIdx; + } + + pCtx->grpRemains = false; + return TSDB_CODE_SUCCESS; + } + } + + for (; !GRP_DONE(probeGrp); ) { + probeGrp->endIdx = probeGrp->readIdx; + for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) { + SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + + if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + rowsLeft -= GRP_REMAIN_ROWS(buildGrp); + buildGrp->readIdx = buildGrp->beginIdx; + continue; + } + + int32_t buildEndIdx = buildGrp->endIdx; + buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; + mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + buildGrp->readIdx += rowsLeft; + buildGrp->endIdx = buildEndIdx; + rowsLeft = 0; + break; + } + probeGrp->endIdx = probeEndIdx; + + if (build->grpIdx >= buildGrpNum) { + build->grpIdx = 0; + ++probeGrp->readIdx; + } + + if (rowsLeft <= 0) { + break; + } + } + + probeGrp->endIdx = probeEndIdx; + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t mInnerJoinHashCart(SMJoinMergeCtx* pCtx) { + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; + SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + + if (build->grpRowIdx >= 0) { + bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + if (build->grpRowIdx < 0) { + probeGrp->readIdx++; + } + + if (!contLoop) { + goto _return; + } + } + + size_t bufLen = 0; + int32_t probeEndIdx = probeGrp->endIdx; + for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) { + if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) { + continue; + } + + SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen); + if (NULL != pGrp) { + build->pHashCurGrp = *pGrp; + build->grpRowIdx = 0; + bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + if (!contLoop) { + if (build->grpRowIdx < 0) { + probeGrp->readIdx++; + } + goto _return; + } + } + } + +_return: + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t mInnerJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { + return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); +} + + static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; @@ -758,12 +656,11 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { int64_t buildTs = 0; SColumnInfoData* pBuildCol = NULL; SColumnInfoData* pProbeCol = NULL; - bool asc = (pJoin->inputTsOrder == TSDB_ORDER_ASC) ? true : false; blockDataCleanup(pCtx->finBlk); if (pCtx->grpRemains) { - MJ_ERR_JRET(mLeftJoinHandleGrpRemains(pCtx)); + MJ_ERR_JRET(mInnerJoinHandleGrpRemains(pCtx)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -779,7 +676,124 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); if (probeTs == pCtx->lastEqTs) { - MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, true)); + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + + if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { + continue; + } else { + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); + } + } + + do { + if (probeTs == buildTs) { + pCtx->lastEqTs = probeTs; + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + + if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { + break; + } + + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); + continue; + } else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { + if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); + continue; + } + } else { + if (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); + continue; + } + } + + break; + } while (true); + } while (true); + +_return: + + if (code) { + pJoin->errCode = code; + return NULL; + } + + return pCtx->finBlk; +} + +static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { + if (pCtx->lastEqGrp) { + return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); + } + + return pCtx->lastProbeGrp ? mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true) : mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false); +} + +static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { + bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); + bool buildGot = false; + + if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { + buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + } + + if (!probeGot && !buildGot) { + mJoinSetDone(pOperator); + return false; + } + + return true; +} + +static FORCE_INLINE int32_t mFullJoinHashCart(SMJoinMergeCtx* pCtx) { + return (NULL == pCtx->pJoin->pPreFilter) ? mOuterJoinHashFullCart(pCtx) : mOuterJoinHashSeqCart(pCtx); +} + +SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { + SMJoinOperatorInfo* pJoin = pOperator->info; + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + int32_t code = TSDB_CODE_SUCCESS; + int64_t probeTs = 0; + int64_t buildTs = 0; + SColumnInfoData* pBuildCol = NULL; + SColumnInfoData* pProbeCol = NULL; + + blockDataCleanup(pCtx->finBlk); + + if (pCtx->midRemains) { + MJ_ERR_JRET(mJoinHandleMidRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + pCtx->midRemains = false; + } + + if (pCtx->grpRemains) { + MJ_ERR_JRET(mFullJoinHandleGrpRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + pCtx->grpRemains = false; + } + + do { + if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) { + break; + } + + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); + + if (probeTs == pCtx->lastEqTs) { + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -794,46 +808,29 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { if (probeTs == buildTs) { pCtx->lastEqTs = probeTs; - MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, false)); + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); - } else if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) { - pCtx->probeNEqGrp.blk = pJoin->probe->blk; - pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx; - pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; - pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx; - while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) { - MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); - if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) { - pCtx->probeNEqGrp.endIdx = pJoin->probe->blkRowIdx; - continue; - } - - break; - } - - MJ_ERR_JRET(mLeftJoinNonEqCart(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { - return pCtx->finBlk; - } + continue; + } + + if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { + MJ_ERR_JRET(mJoinProcessNonEqualGrp(pCtx, pProbeCol, true, &probeTs, &buildTs)); } else { - while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { - MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); - if (LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) { - continue; - } - - break; - } + MJ_ERR_JRET(mJoinProcessNonEqualGrp(pCtx, pBuildCol, false, &probeTs, &buildTs)); + } + + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; } } - if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) { + if (pJoin->build->dsFetchDone && !MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { pCtx->probeNEqGrp.blk = pJoin->probe->blk; pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx; pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; @@ -841,11 +838,26 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; - MJ_ERR_JRET(mLeftJoinNonEqCart(pCtx)); + MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } } + + if (pJoin->probe->dsFetchDone && !MJOIN_PROBE_TB_ROWS_DONE(pJoin->build)) { + pCtx->buildNEqGrp.blk = pJoin->build->blk; + pCtx->buildNEqGrp.beginIdx = pJoin->build->blkRowIdx; + pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx; + pCtx->buildNEqGrp.endIdx = pJoin->build->blk->info.rows - 1; + + pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; + + MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + } + } while (true); _return: @@ -859,4 +871,46 @@ _return: } +int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + + pCtx->pJoin = pJoin; + pCtx->lastEqTs = INT64_MIN; + pCtx->hashCan = pJoin->probe->keyNum > 0; + + if (pJoinNode->node.inputTsOrder != ORDER_DESC) { + pCtx->ascTs = true; + } + + pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize)); + + if (pJoin->pFPreFilter) { + pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false); + blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity); + } + + pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.5; + + switch (pJoin->joinType) { + case JOIN_TYPE_INNER: + pCtx->hashCartFp = mInnerJoinHashCart; + pCtx->mergeCartFp = mInnerJoinMergeCart; + break; + case JOIN_TYPE_LEFT: + case JOIN_TYPE_RIGHT: + pCtx->hashCartFp = mLeftJoinHashCart; + pCtx->mergeCartFp = mLeftJoinMergeCart; + break; + case JOIN_TYPE_FULL: + pCtx->hashCartFp = mFullJoinHashCart; + pCtx->mergeCartFp = mFullJoinMergeCart; + break; + default: + break; + } + + return TSDB_CODE_SUCCESS; +} + diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 0c6341e222..67a10eedaa 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -27,6 +27,254 @@ #include "ttypes.h" #include "mergejoin.h" +int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) { + SSDataBlock* pLess = NULL; + SSDataBlock* pMore = NULL; + if ((*ppMid)->info.rows < (*ppFin)->info.rows) { + pLess = (*ppMid); + pMore = (*ppFin); + } else { + pLess = (*ppFin); + pMore = (*ppMid); + } + + int32_t totalRows = pMore->info.rows + pLess->info.rows; + if (totalRows <= pMore->info.capacity) { + MJ_ERR_RET(blockDataMerge(pMore, pLess)); + blockDataCleanup(pLess); + pCtx->midRemains = false; + } else { + int32_t copyRows = pMore->info.capacity - pMore->info.rows; + MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows)); + blockDataShrinkNRows(pLess, copyRows); + pCtx->midRemains = true; + } + + if (pMore != (*ppFin)) { + TSWAP(*ppMid, *ppFin); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx) { + ASSERT(0 < pCtx->midBlk->info.rows); + + TSWAP(pCtx->midBlk, pCtx->finBlk); + + pCtx->midRemains = false; + + return TSDB_CODE_SUCCESS; +} + +int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp) { + SMJoinTableCtx* probe = probeGrp ? pJoin->probe : pJoin->build; + SMJoinTableCtx* build = probeGrp ? pJoin->build : pJoin->probe; + int32_t currRows = append ? pRes->info.rows : 0; + int32_t firstRows = GRP_REMAIN_ROWS(pGrp); + + for (int32_t c = 0; c < probe->finNum; ++c) { + SMJoinColMap* pFirstCol = probe->finCols + c; + SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot); + SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); + colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows); + } + + for (int32_t c = 0; c < build->finNum; ++c) { + SMJoinColMap* pSecondCol = build->finCols + c; + SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot); + colDataSetNItemsNull(pOutCol, currRows, firstRows); + } + + pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows; + return TSDB_CODE_SUCCESS; +} + + +int32_t mJoinNonEqCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp) { + pCtx->lastEqGrp = false; + pCtx->lastProbeGrp = probeGrp; + + int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; + if (rowsLeft <= 0) { + pCtx->grpRemains = pGrp->readIdx <= pGrp->endIdx; + return TSDB_CODE_SUCCESS; + } + + if (GRP_REMAIN_ROWS(pGrp) <= rowsLeft) { + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp)); + pGrp->readIdx = pGrp->endIdx + 1; + pCtx->grpRemains = false; + } else { + int32_t endIdx = pGrp->endIdx; + pGrp->endIdx = pGrp->readIdx + rowsLeft - 1; + MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp)); + pGrp->readIdx = pGrp->endIdx + 1; + pGrp->endIdx = endIdx; + pCtx->grpRemains = true; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) { + SMJoinTableCtx* probe = pJoin->probe; + SMJoinTableCtx* build = pJoin->build; + int32_t currRows = append ? pRes->info.rows : 0; + int32_t firstRows = GRP_REMAIN_ROWS(pFirst); + int32_t secondRows = GRP_REMAIN_ROWS(pSecond); + ASSERT(secondRows > 0); + + for (int32_t c = 0; c < probe->finNum; ++c) { + SMJoinColMap* pFirstCol = probe->finCols + c; + SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot); + SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); + for (int32_t r = 0; r < firstRows; ++r) { + if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) { + colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows); + } else { + ASSERT(pRes->info.capacity >= (pRes->info.rows + firstRows * secondRows)); + uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type)) ? pOutCol->varmeta.length : ((currRows + r * secondRows) * pOutCol->info.bytes); + ASSERT((startOffset + 1 * pOutCol->info.bytes) <= pRes->info.capacity * pOutCol->info.bytes); + colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r), secondRows, true); + } + } + } + + for (int32_t c = 0; c < build->finNum; ++c) { + SMJoinColMap* pSecondCol = build->finCols + c; + SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot); + SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot); + for (int32_t r = 0; r < firstRows; ++r) { + colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows); + } + } + + pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows; + return TSDB_CODE_SUCCESS; +} + + + +bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build) { + int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity; + if (rowsLeft <= 0) { + return false; + } + + int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp); + int32_t grpRows = buildGrpRows - build->grpRowIdx; + if (grpRows <= 0 || build->grpRowIdx < 0) { + build->grpRowIdx = -1; + return true; + } + + int32_t actRows = TMIN(grpRows, rowsLeft); + int32_t currRows = append ? pBlk->info.rows : 0; + + for (int32_t c = 0; c < probe->finNum; ++c) { + SMJoinColMap* pFirstCol = probe->finCols + c; + SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot); + SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot); + if (colDataIsNull_s(pInCol, probeGrp->readIdx)) { + colDataSetNItemsNull(pOutCol, currRows, actRows); + } else { + colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, true); + } + } + + for (int32_t c = 0; c < build->finNum; ++c) { + SMJoinColMap* pSecondCol = build->finCols + c; + SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot); + for (int32_t r = 0; r < actRows; ++r) { + SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, build->grpRowIdx + r); + SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot); + colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1); + } + } + + pBlk->info.rows += actRows; + + if (actRows == grpRows) { + build->grpRowIdx = -1; + } else { + build->grpRowIdx += actRows; + } + + if (actRows == rowsLeft) { + return false; + } + + return true; +} + + +int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp) { + SMJoinOperatorInfo* pJoin = pCtx->pJoin; + + pCtx->lastEqGrp = true; + + mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true); + if (!lastBuildGrp) { + mJoinRetrieveEqGrpRows(pJoin->pOperator, pJoin->build, timestamp); + } else { + pJoin->build->grpIdx = 0; + } + + if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) { + if (!lastBuildGrp || !pCtx->hashJoin) { + MJ_ERR_RET(mJoinMakeBuildTbHash(pJoin, pJoin->build)); + } + + if (pJoin->probe->newBlk) { + MJ_ERR_RET(mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe)); + pJoin->probe->newBlk = false; + } + + pCtx->hashJoin = true; + + return (*pCtx->hashCartFp)(pCtx); + } + + pCtx->hashJoin = false; + + return (*pCtx->mergeCartFp)(pCtx); +} + +int32_t mJoinProcessNonEqualGrp(SMJoinMergeCtx* pCtx, SColumnInfoData* pCol, bool probeGrp, int64_t* probeTs, int64_t* buildTs) { + SMJoinGrpRows* pGrp; + SMJoinTableCtx* pTb; + int64_t* pTs; + + if (probeGrp) { + pGrp = &pCtx->probeNEqGrp; + pTb = pCtx->pJoin->probe; + pTs = probeTs; + } else { + pGrp = &pCtx->buildNEqGrp; + pTb = pCtx->pJoin->build; + pTs = buildTs; + } + + pGrp->blk = pTb->blk; + pGrp->beginIdx = pTb->blkRowIdx; + pGrp->readIdx = pGrp->beginIdx; + pGrp->endIdx = pGrp->beginIdx; + + while (++pTb->blkRowIdx < pTb->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pCol, *pTs, pTb); + if (PROBE_TS_UNREACH(pCtx->ascTs, *probeTs, *buildTs)) { + pGrp->endIdx = pTb->blkRowIdx; + continue; + } + + break; + } + + return mJoinNonEqCart(pCtx, pGrp, ); +} + SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) { SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES); if (p) { @@ -145,6 +393,14 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi if (NULL == pTable->createdBlks || NULL == pTable->pGrpArrays || NULL == pTable->pGrpHash) { return TSDB_CODE_OUT_OF_MEMORY; } + + if (pJoin->pPreFilter && IS_FULL_OUTER_JOIN(pJoin->joinType, pJoin->subType)) { + pTable->rowBitmapSize = MJOIN_ROW_BITMAP_SIZE; + pTable->pRowBitmap = taosMemoryMalloc(pTable->rowBitmapSize); + if (NULL == pTable->pRowBitmap) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } } return TSDB_CODE_SUCCESS; @@ -245,12 +501,32 @@ static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) { taosArrayClear(pCreatedBlks); } -void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) { +static int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset) { + int32_t bitmapLen = BitmapLen(rowNum); + int64_t reqSize = pTable->rowBitmapOffset + bitmapLen; + if (reqSize > pTable->rowBitmapSize) { + int64_t newSize = reqSize * 1.1; + pTable->pRowBitmap = taosMemoryRealloc(pTable->pRowBitmap, newSize); + if (NULL == pTable->pRowBitmap) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pTable->rowBitmapSize = newSize; + } + + memset(pTable->pRowBitmap + pTable->rowBitmapOffset, 0, bitmapLen); + + *rowBitmapOffset = pTable->rowBitmapOffset; + pTable->rowBitmapOffset += bitmapLen; + + return TSDB_CODE_SUCCESS; +} + +int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) { SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); SMJoinGrpRows* pGrp = NULL; if (*(int64_t*)colDataGetData(pCol, pTable->blkRowIdx) != timestamp) { - return; + return TSDB_CODE_SUCCESS; } if (restart) { @@ -275,8 +551,7 @@ void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBl continue; } - pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1; - return; + goto _return; } if (wholeBlk) { @@ -292,7 +567,16 @@ void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBl taosArrayPush(pTable->createdBlks, &pGrp->blk); } +_return: + + if (wholeBlk && pTable->rowBitmapSize > 0) { + MJ_ERR_RET(mJoinGetRowBitmapOffset(pTable, pGrp->endIdx - pGrp->beginIdx + 1, &pGrp->rowBitmapOffset)); + pGrp->rowMatchNum = 0; + } + pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1; + + return TSDB_CODE_SUCCESS; } @@ -533,6 +817,7 @@ void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) { taosMemoryFree(pTable->finCols); taosMemoryFree(pTable->keyCols); taosMemoryFree(pTable->keyBuf); + taosMemoryFree(pTable->pRowBitmap); taosArrayDestroy(pTable->eqGrps); taosArrayDestroyEx(pTable->pGrpArrays, destroyGrpArray); @@ -562,6 +847,41 @@ void destroyMergeJoinOperator(void* param) { taosMemoryFreeClear(pJoin); } +int32_t mJoinHandleConds(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { + switch (pJoin->joinType) { + case JOIN_TYPE_INNER: { + SNode* pCond = NULL; + if (pJoinNode->pFullOnCond != NULL) { + if (pJoinNode->node.pConditions != NULL) { + MJ_ERR_RET(mergeJoinConds(&pJoinNode->pFullOnCond, &pJoinNode->node.pConditions)); + } + pCond = pJoinNode->pFullOnCond; + } else if (pJoinNode->node.pConditions != NULL) { + pCond = pJoinNode->node.pConditions; + } + + MJ_ERR_RET(filterInitFromNode(pCond, &pJoin->pFinFilter, 0)); + break; + } + case JOIN_TYPE_LEFT: + case JOIN_TYPE_RIGHT: + case JOIN_TYPE_FULL: + if (pJoinNode->pFullOnCond != NULL) { + MJ_ERR_RET(filterInitFromNode(pJoinNode->pFullOnCond, &pJoin->pFPreFilter, 0)); + } + if (pJoinNode->pColOnCond != NULL) { + MJ_ERR_RET(filterInitFromNode(pJoinNode->pColOnCond, &pJoin->pPreFilter, 0)); + } + if (pJoinNode->node.pConditions != NULL) { + MJ_ERR_RET(filterInitFromNode(pJoinNode->node.pConditions, &pJoin->pFinFilter, 0)); + } + break; + default: + break; + } + +} + SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { @@ -582,31 +902,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t mJoinSetBuildAndProbeTable(pInfo, pJoinNode); + MJ_ERR_JRET(mJoinHandleConds(pInfo, pJoinNode)); + mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); - - if (pJoinNode->pFullOnCond != NULL) { - MJ_ERR_JRET(filterInitFromNode(pJoinNode->pFullOnCond, &pInfo->pFPreFilter, 0)); - } - - if (pJoinNode->pColOnCond != NULL) { - MJ_ERR_JRET(filterInitFromNode(pJoinNode->pColOnCond, &pInfo->pPreFilter, 0)); - } - - if (pJoinNode->node.pConditions != NULL) { - MJ_ERR_JRET(filterInitFromNode(pJoinNode->node.pConditions, &pInfo->pFinFilter, 0)); - } MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode)); - if (pJoinNode->node.inputTsOrder == ORDER_ASC) { - pInfo->inputTsOrder = TSDB_ORDER_ASC; - } else if (pJoinNode->node.inputTsOrder == ORDER_DESC) { - pInfo->inputTsOrder = TSDB_ORDER_DESC; - } else { - pInfo->inputTsOrder = TSDB_ORDER_ASC; - } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); MJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream)); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index bb1f98289a..9df8acf683 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -65,6 +65,54 @@ char* getFullJoinTypeString(EJoinType type, EJoinSubType stype) { return joinFullType[type][stype]; } + +int32_t mergeJoinConds(SNode** ppDst, SNode** ppSrc) { + if (NULL == *ppSrc) { + return TSDB_CODE_SUCCESS; + } + if (NULL == *ppDst) { + *ppDst = *ppSrc; + *ppSrc = NULL; + return TSDB_CODE_SUCCESS; + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc) && ((SLogicConditionNode*)(*ppSrc))->condType == LOGIC_COND_TYPE_AND) { + TSWAP(*ppDst, *ppSrc); + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppDst)) { + SLogicConditionNode* pDst = (SLogicConditionNode*)*ppDst; + if (pDst->condType == LOGIC_COND_TYPE_AND) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc) && ((SLogicConditionNode*)(*ppSrc))->condType == LOGIC_COND_TYPE_AND) { + nodesListStrictAppendList(pDst->pParameterList, ((SLogicConditionNode*)(*ppSrc))->pParameterList); + ((SLogicConditionNode*)(*ppSrc))->pParameterList = NULL; + } else { + nodesListStrictAppend(pDst->pParameterList, *ppSrc); + *ppSrc = NULL; + } + nodesDestroyNode(*ppSrc); + *ppSrc = NULL; + + return TSDB_CODE_SUCCESS; + } + } + + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (NULL == pLogicCond) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL; + pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; + pLogicCond->condType = LOGIC_COND_TYPE_AND; + pLogicCond->pParameterList = nodesMakeList(); + nodesListStrictAppend(pLogicCond->pParameterList, *ppSrc); + nodesListStrictAppend(pLogicCond->pParameterList, *ppDst); + + *ppDst = (SNode*)pLogicCond; + *ppSrc = NULL; + + return TSDB_CODE_SUCCESS; +} + + static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) { SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize); if (NULL == pNewChunk) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index c44c8e89b9..52aebfa66e 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -76,6 +76,7 @@ typedef enum ECondAction { #define PUSH_DOWN_LEFT_FLT (1 << 0) #define PUSH_DOWN_RIGHT_FLT (1 << 1) #define PUSH_DOWN_ON_COND (1 << 2) +#define PUSH_DONW_FLT_COND (PUSH_DOWN_LEFT_FLT | PUSH_DOWN_RIGHT_FLT) #define PUSH_DOWN_ALL_COND (PUSH_DOWN_LEFT_FLT | PUSH_DOWN_RIGHT_FLT | PUSH_DOWN_ON_COND) typedef struct SJoinOptimizeOpt { @@ -94,7 +95,7 @@ static SJoinOptimizeOpt gJoinOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = { /*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}, {0}}, }; #else -static SJoinOptimizeOpt gJoinOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = { +static SJoinOptimizeOpt gJoinWhereOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = { /* NONE OUTER SEMI ANTI ASOF WINDOW */ /*INNER*/ {{PUSH_DOWN_ALL_COND}, {0}, {0}, {0}, {0}, {0}}, /*LEFT*/ {{0}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_ALL_COND}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}}, @@ -102,6 +103,15 @@ static SJoinOptimizeOpt gJoinOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = { /*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}}, }; +static SJoinOptimizeOpt gJoinOnOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = { + /* NONE OUTER SEMI ANTI ASOF WINDOW */ +/*INNER*/ {{PUSH_DONW_FLT_COND}, {0}, {0}, {0}, {0}, {0}}, +/*LEFT*/ {{0}, {0}, {PUSH_DONW_FLT_COND}, {0}, {0}, {0}}, +/*RIGHT*/ {{0}, {0}, {PUSH_DONW_FLT_COND}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}}, +/*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}}, +}; + + #endif static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func) { @@ -575,19 +585,19 @@ static ECondAction pdcJoinGetCondAction(SJoinLogicNode* pJoin, SSHashObj* pLeftT if (cxt.havaLeftCol) { if (cxt.haveRightCol) { - if ((!whereCond) || (gJoinOpt[t][s].pushDownFlag & PUSH_DOWN_ON_COND)) { + if (whereCond && gJoinWhereOpt[t][s].pushDownFlag & PUSH_DOWN_ON_COND) { return COND_ACTION_PUSH_JOIN; } return COND_ACTION_STAY; } - if ((!whereCond) || (gJoinOpt[t][s].pushDownFlag & PUSH_DOWN_LEFT_FLT)) { + if ((whereCond && gJoinWhereOpt[t][s].pushDownFlag & PUSH_DOWN_LEFT_FLT) || (!whereCond && gJoinOnOpt[t][s].pushDownFlag & PUSH_DOWN_LEFT_FLT)) { return COND_ACTION_PUSH_LEFT_CHILD; } return COND_ACTION_STAY; } if (cxt.haveRightCol) { - if ((!whereCond) || (gJoinOpt[t][s].pushDownFlag & PUSH_DOWN_RIGHT_FLT)) { + if ((whereCond && gJoinWhereOpt[t][s].pushDownFlag & PUSH_DOWN_RIGHT_FLT) || (!whereCond && gJoinOnOpt[t][s].pushDownFlag & PUSH_DOWN_RIGHT_FLT)) { return COND_ACTION_PUSH_RIGHT_CHILD; } return COND_ACTION_STAY; @@ -1003,6 +1013,7 @@ static int32_t pdcJoinCollectColsFromParent(SJoinLogicNode* pJoin, SSHashObj* pT } nodesWalkExpr(pJoin->pPrimKeyEqCond, pdcJoinCollectCondCol, &cxt); + nodesWalkExpr(pJoin->node.pConditions, pdcJoinCollectCondCol, &cxt); if (TSDB_CODE_SUCCESS == cxt.errCode) { nodesWalkExpr(pJoin->pFullOnCond, pdcJoinCollectCondCol, &cxt); } @@ -1112,6 +1123,48 @@ static int32_t pdcJoinAddPreFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogi return code; } +static int32_t pdcJoinAddWhereFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (NULL == pJoin->node.pConditions) { + return TSDB_CODE_SUCCESS; + } + + int32_t code = TSDB_CODE_SUCCESS; + SNodeList* pCondCols = nodesMakeList(); + SNodeList* pTargets = NULL; + if (NULL == pCondCols) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + code = nodesCollectColumnsFromNode(pJoin->node.pConditions, NULL, COLLECT_COL_TYPE_ALL, &pCondCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExprs(pCondCols, &pTargets); + } + + nodesDestroyList(pCondCols); + + if (TSDB_CODE_SUCCESS == code) { + SNode* pNode = NULL; + FOREACH(pNode, pTargets) { + SNode* pTmp = NULL; + bool found = false; + FOREACH(pTmp, pJoin->node.pTargets) { + if (nodesEqualNode(pTmp, pNode)) { + found = true; + break; + } + } + if (!found) { + nodesListStrictAppend(pJoin->node.pTargets, nodesCloneNode(pNode)); + } + } + } + + nodesDestroyList(pTargets); + + return code; +} + + static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (NULL == pJoin->pFullOnCond) { if ((!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) || NULL == pJoin->node.pConditions) { @@ -1145,7 +1198,7 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { SNode* pLeftChildCond = NULL; SNode* pRightChildCond = NULL; int32_t code = pdcJoinCheckAllCond(pCxt, pJoin); - if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions && 0 != gJoinOpt[t][s].pushDownFlag) { + if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions && 0 != gJoinWhereOpt[t][s].pushDownFlag) { code = pdcJoinSplitCond(pJoin, &pJoin->node.pConditions, &pOnCond, &pLeftChildCond, &pRightChildCond, true); if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) { code = pdcJoinPushDownOnCond(pCxt, pJoin, &pOnCond); @@ -1184,6 +1237,10 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { code = pdcJoinAddPreFilterColsToTarget(pCxt, pJoin); } + if (TSDB_CODE_SUCCESS == code) { + code = pdcJoinAddWhereFilterColsToTarget(pCxt, pJoin); + } + if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); pCxt->optimized = true; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 894f020d75..050a923b0a 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -703,49 +703,6 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, return TSDB_CODE_FAILED; } -static int32_t mergeEqCond(SNode** ppDst, SNode** ppSrc) { - if (NULL == *ppSrc) { - return TSDB_CODE_SUCCESS; - } - if (NULL == *ppDst) { - *ppDst = *ppSrc; - *ppSrc = NULL; - return TSDB_CODE_SUCCESS; - } - if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc)) { - TSWAP(*ppDst, *ppSrc); - } - if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppDst)) { - SLogicConditionNode* pLogic = (SLogicConditionNode*)*ppDst; - if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc)) { - nodesListStrictAppendList(pLogic->pParameterList, ((SLogicConditionNode*)(*ppSrc))->pParameterList); - ((SLogicConditionNode*)(*ppSrc))->pParameterList = NULL; - } else { - nodesListStrictAppend(pLogic->pParameterList, *ppSrc); - *ppSrc = NULL; - } - nodesDestroyNode(*ppSrc); - *ppSrc = NULL; - return TSDB_CODE_SUCCESS; - } - - SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); - if (NULL == pLogicCond) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL; - pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; - pLogicCond->condType = LOGIC_COND_TYPE_AND; - pLogicCond->pParameterList = nodesMakeList(); - nodesListStrictAppend(pLogicCond->pParameterList, *ppSrc); - nodesListStrictAppend(pLogicCond->pParameterList, *ppDst); - - *ppDst = (SNode*)pLogicCond; - *ppSrc = NULL; - - return TSDB_CODE_SUCCESS; -} - static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SDataBlockDescNode** ppDesc) { if (2 == pChildren->length) { *ppDesc = ((SPhysiNode*)nodesListGetNode(pChildren, idx))->pOutputDataBlockDesc; @@ -865,7 +822,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi } if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) { - code = mergeEqCond(&pJoinLogicNode->pColEqCond, &pJoinLogicNode->pTagEqCond); + code = mergeJoinConds(&pJoinLogicNode->pColEqCond, &pJoinLogicNode->pTagEqCond); } //TODO set from input blocks for group algo /* @@ -882,7 +839,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi } if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) { - code = mergeEqCond(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond); + code = mergeJoinConds(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond); } //TODO set from input blocks for group algo /* diff --git a/tests/script/tsim/join/left_join.sim b/tests/script/tsim/join/left_join.sim index 214b5ec9f6..d88c49f8c8 100644 --- a/tests/script/tsim/join/left_join.sim +++ b/tests/script/tsim/join/left_join.sim @@ -2,7 +2,7 @@ sql connect sql use test0; sql select a.col1, b.col1 from sta a left join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1; -if $rows != 5 then +if $rows != 10 then return -1 endi if $data00 != 1 then @@ -35,6 +35,70 @@ endi if $data41 != NULL then return -1 endi +if $data50 != 3 then + return -1 +endi +if $data51 != NULL then + return -1 +endi +if $data60 != 4 then + return -1 +endi +if $data61 != NULL then + return -1 +endi +if $data70 != 5 then + return -1 +endi +if $data71 != NULL then + return -1 +endi +if $data80 != 5 then + return -1 +endi +if $data81 != NULL then + return -1 +endi +if $data90 != 7 then + return -1 +endi +if $data91 != NULL then + return -1 +endi + +sql select a.col1, b.col1 from sta a left join sta b on a.ts = b.ts where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1; +if $rows != 4 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data10 != 1 then + return -1 +endi +if $data11 != 2 then + return -1 +endi +if $data20 != 2 then + return -1 +endi +if $data21 != 1 then + return -1 +endi +if $data30 != 2 then + return -1 +endi +if $data31 != 2 then + return -1 +endi + +sql select a.col1, b.col1 from sta a left join sta b on a.ts = b.ts; +if $rows != 12 then + return -1 +endi sql select a.col1, b.col1 from tba1 a left join tba2 b on a.ts = b.ts order by a.col1, b.col1; if $rows != 4 then