enh: support inner join

This commit is contained in:
dapan1121 2023-12-26 19:28:19 +08:00
parent e570fcef99
commit 194b1f03ae
9 changed files with 1033 additions and 532 deletions

View File

@ -599,6 +599,7 @@ bool nodesIsTableStar(SNode* pNode);
char* getJoinTypeString(EJoinType type); char* getJoinTypeString(EJoinType type);
char* getJoinSTypeString(EJoinSubType type); char* getJoinSTypeString(EJoinSubType type);
char* getFullJoinTypeString(EJoinType type, EJoinSubType stype); char* getFullJoinTypeString(EJoinType type, EJoinSubType stype);
int32_t mergeJoinConds(SNode** ppDst, SNode** ppSrc);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -743,6 +743,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* de
extern void doDestroyExchangeOperatorInfo(void* param); extern void doDestroyExchangeOperatorInfo(void* param);
int32_t doFilterImpl(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo, SColumnInfoData** pResCol);
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, const char* idStr, STableMetaCacheInfo* pCache); int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);

View File

@ -22,10 +22,12 @@ extern "C" {
#define MJOIN_DEFAULT_BLK_ROWS_NUM 4096 #define MJOIN_DEFAULT_BLK_ROWS_NUM 4096
#define MJOIN_HJOIN_CART_THRESHOLD 16 #define MJOIN_HJOIN_CART_THRESHOLD 16
#define MJOIN_BLK_SIZE_LIMIT 10485760 #define MJOIN_BLK_SIZE_LIMIT 10485760
#define MJOIN_ROW_BITMAP_SIZE (2 * 1048576)
struct SMJoinOperatorInfo; struct SMJoinOperatorInfo;
typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*); typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*);
typedef int32_t (*joinCartFp)(void*);
typedef enum EJoinTableType { typedef enum EJoinTableType {
E_JOIN_TB_BUILD = 1, E_JOIN_TB_BUILD = 1,
@ -33,6 +35,7 @@ typedef enum EJoinTableType {
} EJoinTableType; } EJoinTableType;
#define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE") #define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE")
#define IS_FULL_OUTER_JOIN(_jtype, _stype) ((_jtype) == JOIN_TYPE_FULL && (_stype) == JOIN_STYPE_OUTER)
typedef struct SMJoinRowPos { typedef struct SMJoinRowPos {
SSDataBlock* pBlk; SSDataBlock* pBlk;
@ -97,6 +100,10 @@ typedef struct SMJoinTableCtx {
int32_t grpRowIdx; int32_t grpRowIdx;
SArray* pHashCurGrp; SArray* pHashCurGrp;
SSHashObj* pGrpHash; SSHashObj* pGrpHash;
int64_t rowBitmapSize;
int64_t rowBitmapOffset;
char* pRowBitmap;
} SMJoinTableCtx; } SMJoinTableCtx;
typedef struct SMJoinGrpRows { typedef struct SMJoinGrpRows {
@ -104,22 +111,29 @@ typedef struct SMJoinGrpRows {
int32_t beginIdx; int32_t beginIdx;
int32_t endIdx; int32_t endIdx;
int32_t readIdx; int32_t readIdx;
int32_t rowBitmapOffset;
int32_t rowMatchNum;
bool readMatch; bool readMatch;
} SMJoinGrpRows; } SMJoinGrpRows;
typedef struct SMJoinMergeCtx { typedef struct SMJoinMergeCtx {
struct SMJoinOperatorInfo* pJoin; struct SMJoinOperatorInfo* pJoin;
bool ascTs;
bool hashCan; bool hashCan;
bool keepOrder; bool keepOrder;
bool grpRemains; bool grpRemains;
bool midRemains; bool midRemains;
bool lastEqGrp; bool lastEqGrp;
bool lastProbeGrp;
int32_t blkThreshold; int32_t blkThreshold;
SSDataBlock* midBlk; SSDataBlock* midBlk;
SSDataBlock* finBlk; SSDataBlock* finBlk;
int64_t lastEqTs; int64_t lastEqTs;
SMJoinGrpRows probeNEqGrp; SMJoinGrpRows probeNEqGrp;
SMJoinGrpRows buildNEqGrp;
bool hashJoin; bool hashJoin;
joinCartFp hashCartFp;
joinCartFp mergeCartFp;
} SMJoinMergeCtx; } SMJoinMergeCtx;
typedef struct SMJoinWinCtx { typedef struct SMJoinWinCtx {
@ -178,8 +192,8 @@ typedef struct SMJoinOperatorInfo {
#define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair)) #define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair))
#define LEFT_JOIN_NO_EQUAL(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts)) #define PROBE_TS_UNREACH(_order, _pts, _bts) ((_order) && (_pts) < (_bts)) || (!(_order) && (_pts) > (_bts))
#define LEFT_JOIN_DISCRAD(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts)) #define PROBE_TS_OVER(_order, _pts, _bts) ((_order) && (_pts) > (_bts)) || (!(_order) && (_pts) < (_bts))
#define GRP_REMAIN_ROWS(_grp) ((_grp)->endIdx - (_grp)->readIdx + 1) #define GRP_REMAIN_ROWS(_grp) ((_grp)->endIdx - (_grp)->readIdx + 1)
#define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx) #define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx)
@ -225,19 +239,22 @@ typedef struct SMJoinOperatorInfo {
} while (0) } while (0)
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode); int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode);
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator);
bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb); bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb);
void mJoinSetDone(SOperatorInfo* pOperator); void mJoinSetDone(SOperatorInfo* pOperator);
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen); bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen);
void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart); int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart);
int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp); int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp);
int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable); int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable);
int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable); int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable);
int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp);
bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build);
int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond);
int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx);
int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp);
int32_t mJoinNonEqCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp);
int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin);
#ifdef __cplusplus #ifdef __cplusplus
} }

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,254 @@
#include "ttypes.h" #include "ttypes.h"
#include "mergejoin.h" #include "mergejoin.h"
int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
SSDataBlock* pLess = NULL;
SSDataBlock* pMore = NULL;
if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
pLess = (*ppMid);
pMore = (*ppFin);
} else {
pLess = (*ppFin);
pMore = (*ppMid);
}
int32_t totalRows = pMore->info.rows + pLess->info.rows;
if (totalRows <= pMore->info.capacity) {
MJ_ERR_RET(blockDataMerge(pMore, pLess));
blockDataCleanup(pLess);
pCtx->midRemains = false;
} else {
int32_t copyRows = pMore->info.capacity - pMore->info.rows;
MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
blockDataShrinkNRows(pLess, copyRows);
pCtx->midRemains = true;
}
if (pMore != (*ppFin)) {
TSWAP(*ppMid, *ppFin);
}
return TSDB_CODE_SUCCESS;
}
int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx) {
ASSERT(0 < pCtx->midBlk->info.rows);
TSWAP(pCtx->midBlk, pCtx->finBlk);
pCtx->midRemains = false;
return TSDB_CODE_SUCCESS;
}
int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp) {
SMJoinTableCtx* probe = probeGrp ? pJoin->probe : pJoin->build;
SMJoinTableCtx* build = probeGrp ? pJoin->build : pJoin->probe;
int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pGrp);
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows);
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
colDataSetNItemsNull(pOutCol, currRows, firstRows);
}
pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows;
return TSDB_CODE_SUCCESS;
}
int32_t mJoinNonEqCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp) {
pCtx->lastEqGrp = false;
pCtx->lastProbeGrp = probeGrp;
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
if (rowsLeft <= 0) {
pCtx->grpRemains = pGrp->readIdx <= pGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
if (GRP_REMAIN_ROWS(pGrp) <= rowsLeft) {
MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp));
pGrp->readIdx = pGrp->endIdx + 1;
pCtx->grpRemains = false;
} else {
int32_t endIdx = pGrp->endIdx;
pGrp->endIdx = pGrp->readIdx + rowsLeft - 1;
MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp));
pGrp->readIdx = pGrp->endIdx + 1;
pGrp->endIdx = endIdx;
pCtx->grpRemains = true;
}
return TSDB_CODE_SUCCESS;
}
int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) {
SMJoinTableCtx* probe = pJoin->probe;
SMJoinTableCtx* build = pJoin->build;
int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pFirst);
int32_t secondRows = GRP_REMAIN_ROWS(pSecond);
ASSERT(secondRows > 0);
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
for (int32_t r = 0; r < firstRows; ++r) {
if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) {
colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows);
} else {
ASSERT(pRes->info.capacity >= (pRes->info.rows + firstRows * secondRows));
uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type)) ? pOutCol->varmeta.length : ((currRows + r * secondRows) * pOutCol->info.bytes);
ASSERT((startOffset + 1 * pOutCol->info.bytes) <= pRes->info.capacity * pOutCol->info.bytes);
colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r), secondRows, true);
}
}
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < firstRows; ++r) {
colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows);
}
}
pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows;
return TSDB_CODE_SUCCESS;
}
bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build) {
int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity;
if (rowsLeft <= 0) {
return false;
}
int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp);
int32_t grpRows = buildGrpRows - build->grpRowIdx;
if (grpRows <= 0 || build->grpRowIdx < 0) {
build->grpRowIdx = -1;
return true;
}
int32_t actRows = TMIN(grpRows, rowsLeft);
int32_t currRows = append ? pBlk->info.rows : 0;
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot);
if (colDataIsNull_s(pInCol, probeGrp->readIdx)) {
colDataSetNItemsNull(pOutCol, currRows, actRows);
} else {
colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, true);
}
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < actRows; ++r) {
SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, build->grpRowIdx + r);
SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot);
colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1);
}
}
pBlk->info.rows += actRows;
if (actRows == grpRows) {
build->grpRowIdx = -1;
} else {
build->grpRowIdx += actRows;
}
if (actRows == rowsLeft) {
return false;
}
return true;
}
int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
SMJoinOperatorInfo* pJoin = pCtx->pJoin;
pCtx->lastEqGrp = true;
mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true);
if (!lastBuildGrp) {
mJoinRetrieveEqGrpRows(pJoin->pOperator, pJoin->build, timestamp);
} else {
pJoin->build->grpIdx = 0;
}
if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) {
if (!lastBuildGrp || !pCtx->hashJoin) {
MJ_ERR_RET(mJoinMakeBuildTbHash(pJoin, pJoin->build));
}
if (pJoin->probe->newBlk) {
MJ_ERR_RET(mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe));
pJoin->probe->newBlk = false;
}
pCtx->hashJoin = true;
return (*pCtx->hashCartFp)(pCtx);
}
pCtx->hashJoin = false;
return (*pCtx->mergeCartFp)(pCtx);
}
int32_t mJoinProcessNonEqualGrp(SMJoinMergeCtx* pCtx, SColumnInfoData* pCol, bool probeGrp, int64_t* probeTs, int64_t* buildTs) {
SMJoinGrpRows* pGrp;
SMJoinTableCtx* pTb;
int64_t* pTs;
if (probeGrp) {
pGrp = &pCtx->probeNEqGrp;
pTb = pCtx->pJoin->probe;
pTs = probeTs;
} else {
pGrp = &pCtx->buildNEqGrp;
pTb = pCtx->pJoin->build;
pTs = buildTs;
}
pGrp->blk = pTb->blk;
pGrp->beginIdx = pTb->blkRowIdx;
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = pGrp->beginIdx;
while (++pTb->blkRowIdx < pTb->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pCol, *pTs, pTb);
if (PROBE_TS_UNREACH(pCtx->ascTs, *probeTs, *buildTs)) {
pGrp->endIdx = pTb->blkRowIdx;
continue;
}
break;
}
return mJoinNonEqCart(pCtx, pGrp, );
}
SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) { SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) {
SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES); SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES);
if (p) { if (p) {
@ -145,6 +393,14 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi
if (NULL == pTable->createdBlks || NULL == pTable->pGrpArrays || NULL == pTable->pGrpHash) { if (NULL == pTable->createdBlks || NULL == pTable->pGrpArrays || NULL == pTable->pGrpHash) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if (pJoin->pPreFilter && IS_FULL_OUTER_JOIN(pJoin->joinType, pJoin->subType)) {
pTable->rowBitmapSize = MJOIN_ROW_BITMAP_SIZE;
pTable->pRowBitmap = taosMemoryMalloc(pTable->rowBitmapSize);
if (NULL == pTable->pRowBitmap) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -245,12 +501,32 @@ static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) {
taosArrayClear(pCreatedBlks); taosArrayClear(pCreatedBlks);
} }
void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) { static int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset) {
int32_t bitmapLen = BitmapLen(rowNum);
int64_t reqSize = pTable->rowBitmapOffset + bitmapLen;
if (reqSize > pTable->rowBitmapSize) {
int64_t newSize = reqSize * 1.1;
pTable->pRowBitmap = taosMemoryRealloc(pTable->pRowBitmap, newSize);
if (NULL == pTable->pRowBitmap) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTable->rowBitmapSize = newSize;
}
memset(pTable->pRowBitmap + pTable->rowBitmapOffset, 0, bitmapLen);
*rowBitmapOffset = pTable->rowBitmapOffset;
pTable->rowBitmapOffset += bitmapLen;
return TSDB_CODE_SUCCESS;
}
int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
SMJoinGrpRows* pGrp = NULL; SMJoinGrpRows* pGrp = NULL;
if (*(int64_t*)colDataGetData(pCol, pTable->blkRowIdx) != timestamp) { if (*(int64_t*)colDataGetData(pCol, pTable->blkRowIdx) != timestamp) {
return; return TSDB_CODE_SUCCESS;
} }
if (restart) { if (restart) {
@ -275,8 +551,7 @@ void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBl
continue; continue;
} }
pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1; goto _return;
return;
} }
if (wholeBlk) { if (wholeBlk) {
@ -292,7 +567,16 @@ void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBl
taosArrayPush(pTable->createdBlks, &pGrp->blk); taosArrayPush(pTable->createdBlks, &pGrp->blk);
} }
_return:
if (wholeBlk && pTable->rowBitmapSize > 0) {
MJ_ERR_RET(mJoinGetRowBitmapOffset(pTable, pGrp->endIdx - pGrp->beginIdx + 1, &pGrp->rowBitmapOffset));
pGrp->rowMatchNum = 0;
}
pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1; pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;
return TSDB_CODE_SUCCESS;
} }
@ -533,6 +817,7 @@ void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) {
taosMemoryFree(pTable->finCols); taosMemoryFree(pTable->finCols);
taosMemoryFree(pTable->keyCols); taosMemoryFree(pTable->keyCols);
taosMemoryFree(pTable->keyBuf); taosMemoryFree(pTable->keyBuf);
taosMemoryFree(pTable->pRowBitmap);
taosArrayDestroy(pTable->eqGrps); taosArrayDestroy(pTable->eqGrps);
taosArrayDestroyEx(pTable->pGrpArrays, destroyGrpArray); taosArrayDestroyEx(pTable->pGrpArrays, destroyGrpArray);
@ -562,6 +847,41 @@ void destroyMergeJoinOperator(void* param) {
taosMemoryFreeClear(pJoin); taosMemoryFreeClear(pJoin);
} }
int32_t mJoinHandleConds(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
switch (pJoin->joinType) {
case JOIN_TYPE_INNER: {
SNode* pCond = NULL;
if (pJoinNode->pFullOnCond != NULL) {
if (pJoinNode->node.pConditions != NULL) {
MJ_ERR_RET(mergeJoinConds(&pJoinNode->pFullOnCond, &pJoinNode->node.pConditions));
}
pCond = pJoinNode->pFullOnCond;
} else if (pJoinNode->node.pConditions != NULL) {
pCond = pJoinNode->node.pConditions;
}
MJ_ERR_RET(filterInitFromNode(pCond, &pJoin->pFinFilter, 0));
break;
}
case JOIN_TYPE_LEFT:
case JOIN_TYPE_RIGHT:
case JOIN_TYPE_FULL:
if (pJoinNode->pFullOnCond != NULL) {
MJ_ERR_RET(filterInitFromNode(pJoinNode->pFullOnCond, &pJoin->pFPreFilter, 0));
}
if (pJoinNode->pColOnCond != NULL) {
MJ_ERR_RET(filterInitFromNode(pJoinNode->pColOnCond, &pJoin->pPreFilter, 0));
}
if (pJoinNode->node.pConditions != NULL) {
MJ_ERR_RET(filterInitFromNode(pJoinNode->node.pConditions, &pJoin->pFinFilter, 0));
}
break;
default:
break;
}
}
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
@ -582,31 +902,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
mJoinSetBuildAndProbeTable(pInfo, pJoinNode); mJoinSetBuildAndProbeTable(pInfo, pJoinNode);
MJ_ERR_JRET(mJoinHandleConds(pInfo, pJoinNode));
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
if (pJoinNode->pFullOnCond != NULL) {
MJ_ERR_JRET(filterInitFromNode(pJoinNode->pFullOnCond, &pInfo->pFPreFilter, 0));
}
if (pJoinNode->pColOnCond != NULL) {
MJ_ERR_JRET(filterInitFromNode(pJoinNode->pColOnCond, &pInfo->pPreFilter, 0));
}
if (pJoinNode->node.pConditions != NULL) {
MJ_ERR_JRET(filterInitFromNode(pJoinNode->node.pConditions, &pInfo->pFinFilter, 0));
}
MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode)); MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode));
if (pJoinNode->node.inputTsOrder == ORDER_ASC) {
pInfo->inputTsOrder = TSDB_ORDER_ASC;
} else if (pJoinNode->node.inputTsOrder == ORDER_DESC) {
pInfo->inputTsOrder = TSDB_ORDER_DESC;
} else {
pInfo->inputTsOrder = TSDB_ORDER_ASC;
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
MJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream)); MJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));

View File

@ -65,6 +65,54 @@ char* getFullJoinTypeString(EJoinType type, EJoinSubType stype) {
return joinFullType[type][stype]; return joinFullType[type][stype];
} }
int32_t mergeJoinConds(SNode** ppDst, SNode** ppSrc) {
if (NULL == *ppSrc) {
return TSDB_CODE_SUCCESS;
}
if (NULL == *ppDst) {
*ppDst = *ppSrc;
*ppSrc = NULL;
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc) && ((SLogicConditionNode*)(*ppSrc))->condType == LOGIC_COND_TYPE_AND) {
TSWAP(*ppDst, *ppSrc);
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppDst)) {
SLogicConditionNode* pDst = (SLogicConditionNode*)*ppDst;
if (pDst->condType == LOGIC_COND_TYPE_AND) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc) && ((SLogicConditionNode*)(*ppSrc))->condType == LOGIC_COND_TYPE_AND) {
nodesListStrictAppendList(pDst->pParameterList, ((SLogicConditionNode*)(*ppSrc))->pParameterList);
((SLogicConditionNode*)(*ppSrc))->pParameterList = NULL;
} else {
nodesListStrictAppend(pDst->pParameterList, *ppSrc);
*ppSrc = NULL;
}
nodesDestroyNode(*ppSrc);
*ppSrc = NULL;
return TSDB_CODE_SUCCESS;
}
}
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (NULL == pLogicCond) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL;
pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
pLogicCond->condType = LOGIC_COND_TYPE_AND;
pLogicCond->pParameterList = nodesMakeList();
nodesListStrictAppend(pLogicCond->pParameterList, *ppSrc);
nodesListStrictAppend(pLogicCond->pParameterList, *ppDst);
*ppDst = (SNode*)pLogicCond;
*ppSrc = NULL;
return TSDB_CODE_SUCCESS;
}
static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) { static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) {
SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize); SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize);
if (NULL == pNewChunk) { if (NULL == pNewChunk) {

View File

@ -76,6 +76,7 @@ typedef enum ECondAction {
#define PUSH_DOWN_LEFT_FLT (1 << 0) #define PUSH_DOWN_LEFT_FLT (1 << 0)
#define PUSH_DOWN_RIGHT_FLT (1 << 1) #define PUSH_DOWN_RIGHT_FLT (1 << 1)
#define PUSH_DOWN_ON_COND (1 << 2) #define PUSH_DOWN_ON_COND (1 << 2)
#define PUSH_DONW_FLT_COND (PUSH_DOWN_LEFT_FLT | PUSH_DOWN_RIGHT_FLT)
#define PUSH_DOWN_ALL_COND (PUSH_DOWN_LEFT_FLT | PUSH_DOWN_RIGHT_FLT | PUSH_DOWN_ON_COND) #define PUSH_DOWN_ALL_COND (PUSH_DOWN_LEFT_FLT | PUSH_DOWN_RIGHT_FLT | PUSH_DOWN_ON_COND)
typedef struct SJoinOptimizeOpt { typedef struct SJoinOptimizeOpt {
@ -94,7 +95,7 @@ static SJoinOptimizeOpt gJoinOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = {
/*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}, {0}}, /*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}, {0}},
}; };
#else #else
static SJoinOptimizeOpt gJoinOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = { static SJoinOptimizeOpt gJoinWhereOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = {
/* NONE OUTER SEMI ANTI ASOF WINDOW */ /* NONE OUTER SEMI ANTI ASOF WINDOW */
/*INNER*/ {{PUSH_DOWN_ALL_COND}, {0}, {0}, {0}, {0}, {0}}, /*INNER*/ {{PUSH_DOWN_ALL_COND}, {0}, {0}, {0}, {0}, {0}},
/*LEFT*/ {{0}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_ALL_COND}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}}, /*LEFT*/ {{0}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_ALL_COND}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}},
@ -102,6 +103,15 @@ static SJoinOptimizeOpt gJoinOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = {
/*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}}, /*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}},
}; };
static SJoinOptimizeOpt gJoinOnOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = {
/* NONE OUTER SEMI ANTI ASOF WINDOW */
/*INNER*/ {{PUSH_DONW_FLT_COND}, {0}, {0}, {0}, {0}, {0}},
/*LEFT*/ {{0}, {0}, {PUSH_DONW_FLT_COND}, {0}, {0}, {0}},
/*RIGHT*/ {{0}, {0}, {PUSH_DONW_FLT_COND}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}},
/*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}},
};
#endif #endif
static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func) { static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func) {
@ -575,19 +585,19 @@ static ECondAction pdcJoinGetCondAction(SJoinLogicNode* pJoin, SSHashObj* pLeftT
if (cxt.havaLeftCol) { if (cxt.havaLeftCol) {
if (cxt.haveRightCol) { if (cxt.haveRightCol) {
if ((!whereCond) || (gJoinOpt[t][s].pushDownFlag & PUSH_DOWN_ON_COND)) { if (whereCond && gJoinWhereOpt[t][s].pushDownFlag & PUSH_DOWN_ON_COND) {
return COND_ACTION_PUSH_JOIN; return COND_ACTION_PUSH_JOIN;
} }
return COND_ACTION_STAY; return COND_ACTION_STAY;
} }
if ((!whereCond) || (gJoinOpt[t][s].pushDownFlag & PUSH_DOWN_LEFT_FLT)) { if ((whereCond && gJoinWhereOpt[t][s].pushDownFlag & PUSH_DOWN_LEFT_FLT) || (!whereCond && gJoinOnOpt[t][s].pushDownFlag & PUSH_DOWN_LEFT_FLT)) {
return COND_ACTION_PUSH_LEFT_CHILD; return COND_ACTION_PUSH_LEFT_CHILD;
} }
return COND_ACTION_STAY; return COND_ACTION_STAY;
} }
if (cxt.haveRightCol) { if (cxt.haveRightCol) {
if ((!whereCond) || (gJoinOpt[t][s].pushDownFlag & PUSH_DOWN_RIGHT_FLT)) { if ((whereCond && gJoinWhereOpt[t][s].pushDownFlag & PUSH_DOWN_RIGHT_FLT) || (!whereCond && gJoinOnOpt[t][s].pushDownFlag & PUSH_DOWN_RIGHT_FLT)) {
return COND_ACTION_PUSH_RIGHT_CHILD; return COND_ACTION_PUSH_RIGHT_CHILD;
} }
return COND_ACTION_STAY; return COND_ACTION_STAY;
@ -1003,6 +1013,7 @@ static int32_t pdcJoinCollectColsFromParent(SJoinLogicNode* pJoin, SSHashObj* pT
} }
nodesWalkExpr(pJoin->pPrimKeyEqCond, pdcJoinCollectCondCol, &cxt); nodesWalkExpr(pJoin->pPrimKeyEqCond, pdcJoinCollectCondCol, &cxt);
nodesWalkExpr(pJoin->node.pConditions, pdcJoinCollectCondCol, &cxt);
if (TSDB_CODE_SUCCESS == cxt.errCode) { if (TSDB_CODE_SUCCESS == cxt.errCode) {
nodesWalkExpr(pJoin->pFullOnCond, pdcJoinCollectCondCol, &cxt); nodesWalkExpr(pJoin->pFullOnCond, pdcJoinCollectCondCol, &cxt);
} }
@ -1112,6 +1123,48 @@ static int32_t pdcJoinAddPreFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogi
return code; return code;
} }
static int32_t pdcJoinAddWhereFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->node.pConditions) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pCondCols = nodesMakeList();
SNodeList* pTargets = NULL;
if (NULL == pCondCols) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
code = nodesCollectColumnsFromNode(pJoin->node.pConditions, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pCondCols, &pTargets);
}
nodesDestroyList(pCondCols);
if (TSDB_CODE_SUCCESS == code) {
SNode* pNode = NULL;
FOREACH(pNode, pTargets) {
SNode* pTmp = NULL;
bool found = false;
FOREACH(pTmp, pJoin->node.pTargets) {
if (nodesEqualNode(pTmp, pNode)) {
found = true;
break;
}
}
if (!found) {
nodesListStrictAppend(pJoin->node.pTargets, nodesCloneNode(pNode));
}
}
}
nodesDestroyList(pTargets);
return code;
}
static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->pFullOnCond) { if (NULL == pJoin->pFullOnCond) {
if ((!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) || NULL == pJoin->node.pConditions) { if ((!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) || NULL == pJoin->node.pConditions) {
@ -1145,7 +1198,7 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
SNode* pLeftChildCond = NULL; SNode* pLeftChildCond = NULL;
SNode* pRightChildCond = NULL; SNode* pRightChildCond = NULL;
int32_t code = pdcJoinCheckAllCond(pCxt, pJoin); int32_t code = pdcJoinCheckAllCond(pCxt, pJoin);
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions && 0 != gJoinOpt[t][s].pushDownFlag) { if (TSDB_CODE_SUCCESS == code && NULL != pJoin->node.pConditions && 0 != gJoinWhereOpt[t][s].pushDownFlag) {
code = pdcJoinSplitCond(pJoin, &pJoin->node.pConditions, &pOnCond, &pLeftChildCond, &pRightChildCond, true); code = pdcJoinSplitCond(pJoin, &pJoin->node.pConditions, &pOnCond, &pLeftChildCond, &pRightChildCond, true);
if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) { if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) {
code = pdcJoinPushDownOnCond(pCxt, pJoin, &pOnCond); code = pdcJoinPushDownOnCond(pCxt, pJoin, &pOnCond);
@ -1184,6 +1237,10 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
code = pdcJoinAddPreFilterColsToTarget(pCxt, pJoin); code = pdcJoinAddPreFilterColsToTarget(pCxt, pJoin);
} }
if (TSDB_CODE_SUCCESS == code) {
code = pdcJoinAddWhereFilterColsToTarget(pCxt, pJoin);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
pCxt->optimized = true; pCxt->optimized = true;

View File

@ -703,49 +703,6 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
static int32_t mergeEqCond(SNode** ppDst, SNode** ppSrc) {
if (NULL == *ppSrc) {
return TSDB_CODE_SUCCESS;
}
if (NULL == *ppDst) {
*ppDst = *ppSrc;
*ppSrc = NULL;
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc)) {
TSWAP(*ppDst, *ppSrc);
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppDst)) {
SLogicConditionNode* pLogic = (SLogicConditionNode*)*ppDst;
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc)) {
nodesListStrictAppendList(pLogic->pParameterList, ((SLogicConditionNode*)(*ppSrc))->pParameterList);
((SLogicConditionNode*)(*ppSrc))->pParameterList = NULL;
} else {
nodesListStrictAppend(pLogic->pParameterList, *ppSrc);
*ppSrc = NULL;
}
nodesDestroyNode(*ppSrc);
*ppSrc = NULL;
return TSDB_CODE_SUCCESS;
}
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (NULL == pLogicCond) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL;
pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
pLogicCond->condType = LOGIC_COND_TYPE_AND;
pLogicCond->pParameterList = nodesMakeList();
nodesListStrictAppend(pLogicCond->pParameterList, *ppSrc);
nodesListStrictAppend(pLogicCond->pParameterList, *ppDst);
*ppDst = (SNode*)pLogicCond;
*ppSrc = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SDataBlockDescNode** ppDesc) { static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SDataBlockDescNode** ppDesc) {
if (2 == pChildren->length) { if (2 == pChildren->length) {
*ppDesc = ((SPhysiNode*)nodesListGetNode(pChildren, idx))->pOutputDataBlockDesc; *ppDesc = ((SPhysiNode*)nodesListGetNode(pChildren, idx))->pOutputDataBlockDesc;
@ -865,7 +822,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
} }
if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) { if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) {
code = mergeEqCond(&pJoinLogicNode->pColEqCond, &pJoinLogicNode->pTagEqCond); code = mergeJoinConds(&pJoinLogicNode->pColEqCond, &pJoinLogicNode->pTagEqCond);
} }
//TODO set from input blocks for group algo //TODO set from input blocks for group algo
/* /*
@ -882,7 +839,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
} }
if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) { if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) {
code = mergeEqCond(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond); code = mergeJoinConds(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond);
} }
//TODO set from input blocks for group algo //TODO set from input blocks for group algo
/* /*

View File

@ -2,7 +2,7 @@ sql connect
sql use test0; sql use test0;
sql select a.col1, b.col1 from sta a left join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1; sql select a.col1, b.col1 from sta a left join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1;
if $rows != 5 then if $rows != 10 then
return -1 return -1
endi endi
if $data00 != 1 then if $data00 != 1 then
@ -35,6 +35,70 @@ endi
if $data41 != NULL then if $data41 != NULL then
return -1 return -1
endi endi
if $data50 != 3 then
return -1
endi
if $data51 != NULL then
return -1
endi
if $data60 != 4 then
return -1
endi
if $data61 != NULL then
return -1
endi
if $data70 != 5 then
return -1
endi
if $data71 != NULL then
return -1
endi
if $data80 != 5 then
return -1
endi
if $data81 != NULL then
return -1
endi
if $data90 != 7 then
return -1
endi
if $data91 != NULL then
return -1
endi
sql select a.col1, b.col1 from sta a left join sta b on a.ts = b.ts where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1;
if $rows != 4 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data10 != 1 then
return -1
endi
if $data11 != 2 then
return -1
endi
if $data20 != 2 then
return -1
endi
if $data21 != 1 then
return -1
endi
if $data30 != 2 then
return -1
endi
if $data31 != 2 then
return -1
endi
sql select a.col1, b.col1 from sta a left join sta b on a.ts = b.ts;
if $rows != 12 then
return -1
endi
sql select a.col1, b.col1 from tba1 a left join tba2 b on a.ts = b.ts order by a.col1, b.col1; sql select a.col1, b.col1 from tba1 a left join tba2 b on a.ts = b.ts order by a.col1, b.col1;
if $rows != 4 then if $rows != 4 then