enh: left join ut

This commit is contained in:
dapan1121 2023-12-22 19:25:55 +08:00
parent bc38885856
commit 8bdf3df6a9
9 changed files with 1989 additions and 1208 deletions

View File

@ -210,6 +210,7 @@ size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows);
void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
int32_t pageSize);
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);

View File

@ -31,7 +31,7 @@ extern "C" {
#define TSWAP(a, b) \
do { \
char *__tmp = alloca(sizeof(a)); \
char *__tmp = (char*)alloca(sizeof(a)); \
memcpy(__tmp, &(a), sizeof(a)); \
memcpy(&(a), &(b), sizeof(a)); \
memcpy(&(b), __tmp, sizeof(a)); \

View File

@ -242,10 +242,12 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
}
void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows) {
pColumnInfoData->hasNull = true;
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
memset(&pColumnInfoData->varmeta.offset[currentRow], -1, sizeof(int32_t) * numOfRows);
} else {
if (numOfRows < sizeof(char) * 2) {
if (numOfRows < 16) {
for (int32_t i = 0; i < numOfRows; ++i) {
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
}
@ -259,8 +261,9 @@ void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
}
}
memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, (numOfRows - i) / sizeof(char));
i += (numOfRows - i) / sizeof(char) * sizeof(char);
int32_t bytes = (numOfRows - i) / 8;
memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, bytes);
i += bytes * 8;
for (; i < numOfRows; ++i) {
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
@ -542,33 +545,14 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI
}
} else {
if (pSrc->hasNull) {
if (0 == BitPos(dstIdx) && 0 == BitPos(srcIdx)) {
memcpy(&BMCharPos(pDst->nullbitmap, dstIdx), &BMCharPos(pSrc->nullbitmap, srcIdx), BitmapLen(numOfRows));
if (!pDst->hasNull) {
int32_t nullBytes = BitmapLen(numOfRows);
int32_t startPos = CharPos(dstIdx);
for (int32_t i = 0; i < nullBytes; ++i) {
if (pDst->nullbitmap[startPos + i]) {
pDst->hasNull = true;
break;
}
}
}
} else if (BitPos(dstIdx) == BitPos(srcIdx)) {
if (BitPos(dstIdx) == BitPos(srcIdx)) {
for (int32_t i = 0; i < numOfRows; ++i) {
if (0 == BitPos(dstIdx)) {
memcpy(&BMCharPos(pDst->nullbitmap, dstIdx + i), &BMCharPos(pSrc->nullbitmap, srcIdx + i), BitmapLen(numOfRows - i));
if (!pDst->hasNull) {
int32_t nullBytes = BitmapLen(numOfRows - i);
int32_t startPos = CharPos(dstIdx + i);
for (int32_t m = 0; m < nullBytes; ++m) {
if (pDst->nullbitmap[startPos + m]) {
pDst->hasNull = true;
break;
}
}
if (0 == BitPos(dstIdx) && (i + (1 << NBIT) <= numOfRows)) {
BMCharPos(pDst->nullbitmap, dstIdx + i) = BMCharPos(pSrc->nullbitmap, srcIdx + i);
if (BMCharPos(pDst->nullbitmap, dstIdx + i)) {
pDst->hasNull = true;
}
break;
i += (1 << NBIT) - 1;
} else {
if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) {
colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
@ -588,9 +572,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI
}
}
}
} else {
memset(&BMCharPos(pDst->nullbitmap, dstIdx), 0, BitmapLen(numOfRows));
}
}
if (pSrc->pData != NULL) {
memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx, pDst->info.bytes * numOfRows);
@ -665,10 +647,45 @@ int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t
colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows);
}
pDest->info.rows += pSrc->info.rows;
pDest->info.rows += numOfRows;
return TSDB_CODE_SUCCESS;
}
void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
if (numOfRows >= pBlock->info.rows) {
blockDataCleanup(pBlock);
return;
}
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
pCol->varmeta.length = pCol->varmeta.offset[pBlock->info.rows - numOfRows];
memset(pCol->varmeta.offset + pBlock->info.rows - numOfRows, 0, sizeof(*pCol->varmeta.offset) * numOfRows);
} else {
int32_t i = pBlock->info.rows - numOfRows;
for (; i < pBlock->info.rows; ++i) {
if (BitPos(i)) {
colDataClearNull_f(pCol->nullbitmap, i);
} else {
break;
}
}
int32_t bytes = (pBlock->info.rows - i) / 8;
memset(&BMCharPos(pCol->nullbitmap, i), 0, bytes);
i += bytes * 8;
for (; i < pBlock->info.rows; ++i) {
colDataClearNull_f(pCol->nullbitmap, i);
}
}
}
pBlock->info.rows -= numOfRows;
}
size_t blockDataGetSize(const SSDataBlock* pBlock) {
size_t total = 0;
@ -2542,6 +2559,8 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList
int32_t numOfRows = 0;
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
pDst->varmeta.length = 0;
} else {
memset(pDst->nullbitmap, 0, bmLen);
}
}
return;

View File

@ -19,9 +19,9 @@
extern "C" {
#endif
#define MJOIN_DEFAULT_BLK_ROWS_NUM 4096
#define MJOIN_DEFAULT_BLK_ROWS_NUM 4
#define MJOIN_HJOIN_CART_THRESHOLD 16
#define MJOIN_BLK_SIZE_LIMIT 10485760
#define MJOIN_BLK_SIZE_LIMIT 20
struct SMJoinOperatorInfo;
@ -86,6 +86,7 @@ typedef struct SMJoinTableCtx {
SArray* valVarCols;
bool valColExist;
bool newBlk;
SSDataBlock* blk;
int32_t blkRowIdx;
@ -199,9 +200,11 @@ typedef struct SMJoinOperatorInfo {
#define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \
do { \
if (NULL != (_tb)->blk) { \
if (NULL != (_tb)->blk && (_tb)->blkRowIdx < (_tb)->blk->info.rows) { \
(_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \
(_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \
} else { \
(_ts) = INT64_MIN; \
} \
} while (0)
@ -228,6 +231,20 @@ typedef struct SMJoinOperatorInfo {
goto _return; \
} \
} while (0)
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode);
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator);
bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb);
void mJoinSetDone(SOperatorInfo* pOperator);
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen);
void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart);
int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp);
int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable);
int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable);
#ifdef __cplusplus

View File

@ -0,0 +1,862 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "operator.h"
#include "os.h"
#include "querynodes.h"
#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
#include "mergejoin.h"
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
pCtx->pJoin = pJoin;
pCtx->lastEqTs = INT64_MIN;
pCtx->hashCan = pJoin->probe->keyNum > 0;
pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize));
if (pJoin->pFPreFilter) {
pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false);
blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity);
}
pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.5;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp) {
SMJoinTableCtx* probe = pJoin->probe;
SMJoinTableCtx* build = pJoin->build;
int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pGrp);
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows);
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
colDataSetNItemsNull(pOutCol, currRows, firstRows);
}
pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) {
SMJoinTableCtx* probe = pJoin->probe;
SMJoinTableCtx* build = pJoin->build;
int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pFirst);
int32_t secondRows = GRP_REMAIN_ROWS(pSecond);
ASSERT(secondRows > 0);
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
for (int32_t r = 0; r < firstRows; ++r) {
if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) {
colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows);
} else {
ASSERT(pRes->info.capacity >= (pRes->info.rows + firstRows * secondRows));
uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type)) ? pOutCol->varmeta.length : ((currRows + r * secondRows) * pOutCol->info.bytes);
ASSERT((startOffset + 1 * pOutCol->info.bytes) <= pRes->info.capacity * pOutCol->info.bytes);
colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r), secondRows, true);
}
}
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < firstRows; ++r) {
colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows);
}
}
pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx;
if (0 == build->grpIdx && probeRows * build->grpTotalRows <= rowsLeft) {
SMJoinGrpRows* pFirstBuild = taosArrayGet(build->eqGrps, 0);
if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
buildGrp->readIdx = buildGrp->beginIdx;
}
pCtx->grpRemains = false;
return TSDB_CODE_SUCCESS;
}
}
for (; !GRP_DONE(probeGrp); ) {
probeGrp->endIdx = probeGrp->readIdx;
for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
buildGrp->readIdx = buildGrp->beginIdx;
continue;
}
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
probeGrp->endIdx = probeEndIdx;
if (build->grpIdx >= buildGrpNum) {
build->grpIdx = 0;
++probeGrp->readIdx;
}
if (rowsLeft <= 0) {
break;
}
}
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
SSDataBlock* pLess = NULL;
SSDataBlock* pMore = NULL;
if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
pLess = (*ppMid);
pMore = (*ppFin);
} else {
pLess = (*ppFin);
pMore = (*ppMid);
}
int32_t totalRows = pMore->info.rows + pLess->info.rows;
if (totalRows <= pMore->info.capacity) {
MJ_ERR_RET(blockDataMerge(pMore, pLess));
blockDataCleanup(pLess);
pCtx->midRemains = false;
} else {
int32_t copyRows = pMore->info.capacity - pMore->info.rows;
MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
blockDataShrinkNRows(pLess, copyRows);
pCtx->midRemains = true;
}
if (pMore != (*ppFin)) {
TSWAP(*ppMid, *ppFin);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeEndIdx = probeGrp->endIdx;
int32_t rowsLeft = pCtx->midBlk->info.capacity;
bool contLoop = true;
blockDataCleanup(pCtx->midBlk);
do {
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk);
++probeGrp->readIdx, probeGrp->readMatch = false, probeGrp->endIdx = probeEndIdx, build->grpIdx = 0) {
probeGrp->endIdx = probeGrp->readIdx;
rowsLeft = pCtx->midBlk->info.capacity;
for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
buildGrp->readIdx = buildGrp->beginIdx;
continue;
}
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
ASSERT(buildGrp->endIdx >= buildGrp->readIdx);
MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pFPreFilter, NULL));
if (pCtx->midBlk->info.rows > 0) {
probeGrp->readMatch = true;
}
}
if (0 == pCtx->midBlk->info.rows) {
if (build->grpIdx == buildGrpNum) {
if (!probeGrp->readMatch) {
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
}
continue;
}
} else {
MJ_ERR_RET(mLeftJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
if (pCtx->midRemains) {
contLoop = false;
} else if (build->grpIdx == buildGrpNum) {
continue;
}
}
//need break
probeGrp->endIdx = probeEndIdx;
if (build->grpIdx >= buildGrpNum) {
build->grpIdx = 0;
++probeGrp->readIdx;
probeGrp->readMatch = false;
}
break;
}
if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) {
break;
}
} while (contLoop);
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinMergeCart(SMJoinMergeCtx* pCtx) {
return (NULL == pCtx->pJoin->pFPreFilter) ? mLeftJoinMergeFullCart(pCtx) : mLeftJoinMergeSeqCart(pCtx);
}
static int32_t mLeftJoinNonEqCart(SMJoinMergeCtx* pCtx) {
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinGrpRows* probeGrp = &pCtx->probeNEqGrp;
if (rowsLeft <= 0) {
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
pCtx->lastEqGrp = false;
if (probeRows <= rowsLeft) {
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
probeGrp->readIdx = probeGrp->endIdx + 1;
pCtx->grpRemains = false;
} else {
int32_t probeEndIdx = probeGrp->endIdx;
probeGrp->endIdx = probeGrp->readIdx + rowsLeft - 1;
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
probeGrp->readIdx = probeGrp->endIdx + 1;
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = true;
}
return TSDB_CODE_SUCCESS;
}
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot) {
mJoinSetDone(pOperator);
return false;
}
if (buildGot) {
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
continue;
}
}
break;
} while (true);
return true;
}
static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build) {
int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity;
if (rowsLeft <= 0) {
return false;
}
int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp);
int32_t grpRows = buildGrpRows - build->grpRowIdx;
if (grpRows <= 0 || build->grpRowIdx < 0) {
build->grpRowIdx = -1;
return true;
}
int32_t actRows = TMIN(grpRows, rowsLeft);
int32_t currRows = append ? pBlk->info.rows : 0;
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot);
if (colDataIsNull_s(pInCol, probeGrp->readIdx)) {
colDataSetNItemsNull(pOutCol, currRows, actRows);
} else {
colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, true);
}
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < actRows; ++r) {
SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, build->grpRowIdx + r);
SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot);
colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1);
}
}
pBlk->info.rows += actRows;
if (actRows == grpRows) {
build->grpRowIdx = -1;
} else {
build->grpRowIdx += actRows;
}
if (actRows == rowsLeft) {
return false;
}
return true;
}
static int32_t mLeftJoinHashFullCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (build->grpRowIdx >= 0) {
bool contLoop = mLeftJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build);
if (build->grpRowIdx < 0) {
probeGrp->readIdx++;
}
if (!contLoop) {
goto _return;
}
}
size_t bufLen = 0;
int32_t probeEndIdx = probeGrp->endIdx;
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
probeGrp->endIdx = probeEndIdx;
continue;
}
SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
if (NULL == pGrp) {
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
probeGrp->endIdx = probeEndIdx;
} else {
build->pHashCurGrp = *pGrp;
build->grpRowIdx = 0;
bool contLoop = mLeftJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build);
if (!contLoop) {
if (build->grpRowIdx < 0) {
probeGrp->readIdx++;
}
goto _return;
}
}
}
_return:
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
blockDataCleanup(pCtx->midBlk);
do {
mLeftJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pPreFilter, NULL));
if (pCtx->midBlk->info.rows > 0) {
probeGrp->readMatch = true;
}
}
if (0 == pCtx->midBlk->info.rows) {
if (build->grpRowIdx < 0) {
if (!probeGrp->readMatch) {
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
}
break;
}
continue;
} else {
MJ_ERR_RET(mLeftJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
if (pCtx->midRemains) {
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
*contLoop = false;
return TSDB_CODE_SUCCESS;
}
if (build->grpRowIdx < 0) {
break;
}
continue;
}
} while (true);
*contLoop = true;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
bool contLoop = false;
if (build->grpRowIdx >= 0) {
MJ_ERR_RET(mLeftJoinHashGrpCartFilter(pCtx, &contLoop));
if (build->grpRowIdx < 0) {
probeGrp->readIdx++;
probeGrp->readMatch = false;
}
if (!contLoop) {
goto _return;
}
}
size_t bufLen = 0;
int32_t probeEndIdx = probeGrp->endIdx;
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk);) {
if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
probeGrp->endIdx = probeEndIdx;
probeGrp->readIdx++;
probeGrp->readMatch = false;
continue;
}
SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
if (NULL == pGrp) {
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
probeGrp->endIdx = probeEndIdx;
probeGrp->readIdx++;
probeGrp->readMatch = false;
} else {
build->pHashCurGrp = *pGrp;
build->grpRowIdx = 0;
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mLeftJoinHashGrpCartFilter(pCtx, &contLoop));
probeGrp->endIdx = probeEndIdx;
if (build->grpRowIdx < 0) {
probeGrp->readIdx++;
probeGrp->readMatch = false;
}
if (!contLoop) {
break;
}
}
}
_return:
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinHashCart(SMJoinMergeCtx* pCtx) {
return (NULL == pCtx->pJoin->pPreFilter) ? mLeftJoinHashFullCart(pCtx) : mLeftJoinHashSeqCart(pCtx);
}
static int32_t mLeftJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
SMJoinOperatorInfo* pJoin = pCtx->pJoin;
pCtx->lastEqGrp = true;
mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true);
if (!lastBuildGrp) {
mJoinRetrieveEqGrpRows(pJoin->pOperator, pJoin->build, timestamp);
} else {
pJoin->build->grpIdx = 0;
}
if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) {
if (!lastBuildGrp || !pCtx->hashJoin) {
MJ_ERR_RET(mJoinMakeBuildTbHash(pJoin, pJoin->build));
}
if (pJoin->probe->newBlk) {
MJ_ERR_RET(mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe));
pJoin->probe->newBlk = false;
}
pCtx->hashJoin = true;
return mLeftJoinHashCart(pCtx);
}
pCtx->hashJoin = false;
return mLeftJoinMergeCart(pCtx);
}
static int32_t mLeftJoinHandleMidRemains(SMJoinMergeCtx* pCtx) {
ASSERT(0 < pCtx->midBlk->info.rows);
TSWAP(pCtx->midBlk, pCtx->finBlk);
pCtx->midRemains = false;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
if (pCtx->lastEqGrp) {
return (pCtx->hashJoin) ? mLeftJoinHashCart(pCtx) : mLeftJoinMergeCart(pCtx);
}
return mLeftJoinNonEqCart(pCtx);
}
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
int32_t code = TSDB_CODE_SUCCESS;
int64_t probeTs = 0;
int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL;
bool asc = (pJoin->inputTsOrder == TSDB_ORDER_ASC) ? true : false;
blockDataCleanup(pCtx->finBlk);
if (pCtx->midRemains) {
MJ_ERR_JRET(mLeftJoinHandleMidRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
pCtx->midRemains = false;
}
if (pCtx->grpRemains) {
MJ_ERR_JRET(mLeftJoinHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
pCtx->grpRemains = false;
}
do {
if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) {
break;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
continue;
} else {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
}
}
while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
} else if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) {
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx;
while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) {
pCtx->probeNEqGrp.endIdx = pJoin->probe->blkRowIdx;
continue;
}
break;
}
MJ_ERR_JRET(mLeftJoinNonEqCart(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
} else {
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
if (LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) {
continue;
}
break;
}
}
}
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) {
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mLeftJoinNonEqCart(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
}
} while (true);
_return:
if (code) {
pJoin->errCode = code;
return NULL;
}
return pCtx->finBlk;
}
static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot || !buildGot) {
mJoinSetDone(pOperator);
return false;
}
return true;
}
SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
int32_t code = TSDB_CODE_SUCCESS;
int64_t probeTs = 0;
int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL;
bool asc = (pJoin->inputTsOrder == TSDB_ORDER_ASC) ? true : false;
blockDataCleanup(pCtx->finBlk);
if (pCtx->grpRemains) {
MJ_ERR_JRET(mLeftJoinHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
pCtx->grpRemains = false;
}
do {
if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) {
break;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
continue;
} else {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
}
}
while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
} else if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) {
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx;
while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) {
pCtx->probeNEqGrp.endIdx = pJoin->probe->blkRowIdx;
continue;
}
break;
}
MJ_ERR_JRET(mLeftJoinNonEqCart(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
} else {
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
if (LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) {
continue;
}
break;
}
}
}
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) {
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mLeftJoinNonEqCart(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
}
} while (true);
_return:
if (code) {
pJoin->errCode = code;
return NULL;
}
return pCtx->finBlk;
}

View File

@ -141,7 +141,7 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi
if (E_JOIN_TB_BUILD == pTable->type) {
pTable->createdBlks = taosArrayInit(8, POINTER_BYTES);
pTable->pGrpArrays = taosArrayInit(32, POINTER_BYTES);
pTable->pGrpHash = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
pTable->pGrpHash = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (NULL == pTable->createdBlks || NULL == pTable->pGrpArrays || NULL == pTable->pGrpHash) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -190,26 +190,6 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
pInfo->probe->type = E_JOIN_TB_PROBE;
}
static int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
pCtx->pJoin = pJoin;
pCtx->lastEqTs = INT64_MIN;
pCtx->hashCan = pJoin->probe->keyNum > 0;
pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize));
if (pJoin->pFPreFilter) {
pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false);
blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity);
}
pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.5;
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
#if 0
pJoin->joinFps = &gMJoinFps[pJoin->joinType][pJoin->subType];
@ -223,7 +203,7 @@ static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode*
#endif
}
static void mJoinSetDone(SOperatorInfo* pOperator) {
void mJoinSetDone(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
if (pOperator->pDownstreamGetParams) {
freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM);
@ -233,252 +213,7 @@ static void mJoinSetDone(SOperatorInfo* pOperator) {
}
}
static int32_t mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp) {
SMJoinTableCtx* probe = pJoin->probe;
SMJoinTableCtx* build = pJoin->build;
int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pGrp);
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows);
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
colDataSetNItemsNull(pOutCol, currRows, firstRows);
}
pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) {
SMJoinTableCtx* probe = pJoin->probe;
SMJoinTableCtx* build = pJoin->build;
int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pFirst);
int32_t secondRows = GRP_REMAIN_ROWS(pSecond);
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
for (int32_t r = 0; r < firstRows; ++r) {
if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) {
colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows);
} else {
colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->beginIdx + r), secondRows, true);
}
}
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < firstRows; ++r) {
colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows);
}
}
pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx;
if (probeRows * build->grpTotalRows <= rowsLeft) {
for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
}
pCtx->grpRemains = false;
return TSDB_CODE_SUCCESS;
}
for (; !GRP_DONE(probeGrp); ++probeGrp->readIdx, build->grpIdx = 0) {
probeGrp->endIdx = probeGrp->readIdx;
for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
continue;
}
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
if (rowsLeft <= 0) {
break;
}
}
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
SSDataBlock* pLess = NULL;
SSDataBlock* pMore = NULL;
if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
pLess = (*ppMid);
pMore = (*ppFin);
} else {
pLess = (*ppFin);
pMore = (*ppMid);
}
int32_t totalRows = pMore->info.rows + pLess->info.rows;
if (totalRows <= pMore->info.capacity) {
MJ_ERR_RET(blockDataMerge(pMore, pLess));
blockDataReset(pLess);
pCtx->midRemains = false;
} else {
int32_t copyRows = pMore->info.capacity - pMore->info.rows;
MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
pCtx->midRemains = true;
}
if (pMore != (*ppFin)) {
TSWAP(*ppMid, *ppFin);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeEndIdx = probeGrp->endIdx;
int32_t rowsLeft = pCtx->midBlk->info.capacity;
bool contLoop = true;
blockDataReset(pCtx->midBlk);
do {
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx, probeGrp->readMatch = false, build->grpIdx = 0) {
probeGrp->endIdx = probeGrp->readIdx;
for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
continue;
}
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
MJ_ERR_RET(mLeftJoinGrpEqCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pFPreFilter, NULL));
if (pCtx->midBlk->info.rows > 0) {
probeGrp->readMatch = true;
}
}
if (0 == pCtx->midBlk->info.rows) {
if (build->grpIdx == buildGrpNum) {
if (!probeGrp->readMatch) {
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
}
continue;
}
break;
} else {
MJ_ERR_RET(mLeftJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
if (pCtx->midRemains) {
contLoop = false;
break;
}
if (build->grpIdx == buildGrpNum) {
continue;
}
break;
}
}
if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) {
break;
}
rowsLeft = pCtx->midBlk->info.capacity;
} while (contLoop);
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinMergeCart(SMJoinMergeCtx* pCtx) {
return (NULL == pCtx->pJoin->pFPreFilter) ? mLeftJoinMergeFullCart(pCtx) : mLeftJoinMergeSeqCart(pCtx);
}
static int32_t mLeftJoinNonEqCart(SMJoinMergeCtx* pCtx) {
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinGrpRows* probeGrp = &pCtx->probeNEqGrp;
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
pCtx->lastEqGrp = false;
if (probeRows <= rowsLeft) {
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
probeGrp->readIdx = probeGrp->endIdx + 1;
pCtx->grpRemains = false;
} else {
int32_t probeEndIdx = probeGrp->endIdx;
probeGrp->endIdx = probeGrp->readIdx + rowsLeft - 1;
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
probeGrp->readIdx = probeGrp->endIdx + 1;
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = true;
}
return TSDB_CODE_SUCCESS;
}
static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) {
bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) {
if (pTb->dsFetchDone) {
return (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) ? false : true;
}
@ -492,6 +227,8 @@ static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBl
*pIdx = 0;
if (NULL == (*ppBlk)) {
pTb->dsFetchDone = true;
} else {
pTb->newBlk = true;
}
return ((*ppBlk) == NULL) ? false : true;
@ -500,35 +237,6 @@ static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBl
return true;
}
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot) {
mJoinSetDone(pOperator);
return false;
}
if (buildGot) {
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
continue;
}
}
break;
} while (true);
return true;
}
static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) {
int32_t blkNum = taosArrayGetSize(pCreatedBlks);
for (int32_t i = 0; i < blkNum; ++i) {
@ -537,19 +245,23 @@ static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) {
taosArrayClear(pCreatedBlks);
}
static void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
SMJoinGrpRows* pGrp = NULL;
if (*(int64_t*)colDataGetData(pCol, pTable->blkRowIdx) != timestamp) {
return;
}
if (restart) {
pTable->grpTotalRows = 0;
pTable->grpIdx = 0;
mJoinDestroyCreatedBlks(pTable->createdBlks);
pGrp = taosArrayGet(pTable->eqGrps, 0);
} else {
pGrp = taosArrayReserve(pTable->eqGrps, 1);
taosArrayClear(pTable->eqGrps);
}
pGrp = taosArrayReserve(pTable->eqGrps, 1);
pGrp->beginIdx = pTable->blkRowIdx++;
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = pGrp->beginIdx;
@ -584,7 +296,7 @@ static void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool*
}
static int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp) {
int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp) {
bool wholeBlk = false;
mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true);
@ -607,7 +319,7 @@ static int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx*
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
for (int32_t i = 0; i < pTable->keyNum; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
@ -628,7 +340,7 @@ static int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable)
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen) {
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen) {
char *pData = NULL;
size_t bufLen = 0;
@ -705,11 +417,13 @@ static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDat
}
static int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
size_t bufLen = 0;
tSimpleHashClear(pJoin->build->pGrpHash);
pJoin->build->grpArrayIdx = 0;
pJoin->build->grpRowIdx = -1;
int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
for (int32_t g = 0; g < grpNum; ++g) {
@ -718,7 +432,7 @@ static int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* p
int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
for (int32_t r = 0; r < grpRows; ++r) {
if (mJoinCopyKeyColsDataToBuf(pTable, r, &bufLen)) {
if (mJoinCopyKeyColsDataToBuf(pTable, pGrp->beginIdx + r, &bufLen)) {
continue;
}
@ -729,362 +443,6 @@ static int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* p
return TSDB_CODE_SUCCESS;
}
static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build) {
int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity;
if (rowsLeft <= 0) {
return false;
}
int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp);
int32_t grpRows = buildGrpRows - build->grpRowIdx;
if (grpRows <= 0) {
return true;
}
int32_t actRows = TMIN(grpRows, rowsLeft);
int32_t currRows = append ? pBlk->info.rows : 0;
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot);
if (colDataIsNull_s(pInCol, probeGrp->readIdx)) {
colDataSetNItemsNull(pOutCol, currRows, actRows);
} else {
colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, true);
}
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < actRows; ++r) {
SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, r);
SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot);
colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1);
}
}
pBlk->info.rows += actRows;
if (actRows == grpRows) {
build->grpRowIdx = -1;
} else {
build->grpRowIdx += actRows;
}
if (actRows == rowsLeft) {
return false;
}
return true;
}
static int32_t mLeftJoinHashFullCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (build->grpRowIdx >= 0) {
bool contLoop = mLeftJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build);
if (build->grpRowIdx < 0) {
probeGrp->readIdx++;
}
if (!contLoop) {
goto _return;
}
}
size_t bufLen = 0;
int32_t probeEndIdx = probeGrp->endIdx;
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
continue;
}
SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
if (NULL == pGrp) {
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
probeGrp->endIdx = probeEndIdx;
} else {
build->pHashCurGrp = *pGrp;
build->grpRowIdx = 0;
if (!mLeftJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build)) {
break;
}
}
}
_return:
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
blockDataReset(pCtx->midBlk);
do {
mLeftJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
if (build->grpRowIdx < 0) {
probeGrp->readIdx++;
}
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pPreFilter, NULL));
if (pCtx->midBlk->info.rows > 0) {
probeGrp->readMatch = true;
}
}
if (0 == pCtx->midBlk->info.rows) {
if (build->grpRowIdx < 0) {
if (!probeGrp->readMatch) {
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
}
probeGrp->readMatch = false;
break;
}
continue;
} else {
MJ_ERR_RET(mLeftJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
if (pCtx->midRemains) {
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
*contLoop = false;
return TSDB_CODE_SUCCESS;
}
if (build->grpRowIdx < 0) {
probeGrp->readMatch = false;
break;
}
continue;
}
} while (true);
*contLoop = true;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
bool contLoop = false;
if (build->grpRowIdx >= 0) {
MJ_ERR_RET(mLeftJoinHashGrpCartFilter(pCtx, &contLoop));
if (!contLoop) {
goto _return;
}
}
size_t bufLen = 0;
int32_t probeEndIdx = probeGrp->endIdx;
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk);) {
if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
continue;
}
SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
if (NULL == pGrp) {
probeGrp->endIdx = probeGrp->readIdx;
MJ_ERR_RET(mLeftJoinGrpNonEqCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp));
probeGrp->endIdx = probeEndIdx;
probeGrp->readIdx++;
probeGrp->readMatch = false;
} else {
build->pHashCurGrp = *pGrp;
build->grpRowIdx = 0;
MJ_ERR_RET(mLeftJoinHashGrpCartFilter(pCtx, &contLoop));
if (!contLoop) {
break;
}
}
}
_return:
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mLeftJoinHashCart(SMJoinMergeCtx* pCtx) {
return (NULL == pCtx->pJoin->pPreFilter) ? mLeftJoinHashFullCart(pCtx) : mLeftJoinHashSeqCart(pCtx);
}
static int32_t mLeftJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
SMJoinOperatorInfo* pJoin = pCtx->pJoin;
pCtx->lastEqGrp = true;
mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true);
if (!lastBuildGrp) {
mJoinRetrieveEqGrpRows(pJoin->pOperator, pJoin->build, timestamp);
} else {
pJoin->build->grpIdx = 0;
}
if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) {
if (!lastBuildGrp || NULL == pJoin->build->pGrpHash) {
MJ_ERR_RET(mJoinMakeBuildTbHash(pJoin, pJoin->build));
MJ_ERR_RET(mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe));
}
pCtx->hashJoin = true;
return mLeftJoinHashCart(pCtx);
}
return mLeftJoinMergeCart(pCtx);
}
static bool mLeftJoinHandleMidRemains(SMJoinMergeCtx* pCtx) {
ASSERT(0 < pCtx->midBlk->info.rows);
TSWAP(pCtx->midBlk, pCtx->finBlk);
return (pCtx->finBlk->info.rows >= pCtx->blkThreshold) ? false : true;
}
static int32_t mLeftJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
if (pCtx->lastEqGrp) {
return (pCtx->hashJoin) ? mLeftJoinHashCart(pCtx) : mLeftJoinMergeCart(pCtx);
}
return mLeftJoinNonEqCart(pCtx);
}
static SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
int32_t code = TSDB_CODE_SUCCESS;
int64_t probeTs = 0;
int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL;
bool asc = (pJoin->inputTsOrder == TSDB_ORDER_ASC) ? true : false;
blockDataReset(pCtx->finBlk);
if (pCtx->midRemains) {
MJ_ERR_JRET(mLeftJoinHandleMidRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
}
if (pCtx->grpRemains) {
MJ_ERR_JRET(mLeftJoinHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
}
do {
if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) {
break;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
continue;
} else {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
}
}
while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mLeftJoinProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
} else if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) {
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx;
while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
if (LEFT_JOIN_NO_EQUAL(asc, probeTs, buildTs)) {
pCtx->probeNEqGrp.endIdx = pJoin->probe->blkRowIdx;
continue;
}
break;
}
MJ_ERR_JRET(mLeftJoinNonEqCart(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
} else {
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
if (LEFT_JOIN_DISCRAD(asc, probeTs, buildTs)) {
continue;
}
break;
}
}
}
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mLeftJoinNonEqCart(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
}
} while (true);
_return:
if (code) {
pJoin->errCode = code;
return NULL;
}
return pCtx->finBlk;
}
void mJoinResetTableCtx(SMJoinTableCtx* pCtx) {
pCtx->dsInitDone = false;
pCtx->dsFetchDone = false;
@ -1140,6 +498,7 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
pBlock = mLeftJoinDo(pOperator);
if (NULL == pBlock) {
if (pJoin->errCode) {
ASSERT(0);
T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
}
break;
@ -1162,7 +521,18 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
void destroyMergeJoinOperator(void* param) {
SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param;
SOperatorInfo* pOperator = (SOperatorInfo*)param;
SMJoinOperatorInfo* pJoin = pOperator->info;
pJoin->ctx.mergeCtx.finBlk = blockDataDestroy(pJoin->ctx.mergeCtx.finBlk);
pJoin->ctx.mergeCtx.midBlk = blockDataDestroy(pJoin->ctx.mergeCtx.midBlk);
mJoinDestroyCreatedBlks(pJoin->probe->createdBlks);
taosArrayDestroy(pJoin->probe->createdBlks);
tSimpleHashCleanup(pJoin->probe->pGrpHash);
mJoinDestroyCreatedBlks(pJoin->build->createdBlks);
taosArrayDestroy(pJoin->build->createdBlks);
tSimpleHashCleanup(pJoin->build->pGrpHash);
taosMemoryFreeClear(param);
}

View File

@ -1,23 +1,23 @@
MESSAGE(STATUS "build parser unit test")
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(executorTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
executorTest
PRIVATE os util common transport gtest taos_static qcom executor function planner scalar nodes vnode
)
TARGET_INCLUDE_DIRECTORIES(
executorTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/executor/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/executor/inc"
)
ENDIF ()
# IF(NOT TD_DARWIN)
# # GoogleTest requires at least C++11
# SET(CMAKE_CXX_STANDARD 11)
# AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
#
# ADD_EXECUTABLE(executorTest ${SOURCE_LIST})
# TARGET_LINK_LIBRARIES(
# executorTest
# PRIVATE os util common transport gtest taos_static qcom executor function planner scalar nodes vnode
# )
#
# TARGET_INCLUDE_DIRECTORIES(
# executorTest
# PUBLIC "${TD_SOURCE_DIR}/include/libs/executor/"
# PRIVATE "${TD_SOURCE_DIR}/source/libs/executor/inc"
# )
# ENDIF ()
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
@ -25,7 +25,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(joinTests joinTests.cpp)
TARGET_LINK_LIBRARIES(
joinTests
PRIVATE os util common executor gtest_main qcom
PRIVATE os util common executor gtest_main qcom function planner scalar nodes vnode
)
TARGET_INCLUDE_DIRECTORIES(

File diff suppressed because it is too large Load Diff

View File

@ -1079,7 +1079,10 @@ static int32_t pdcJoinAddPreFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogi
if (NULL == pCondCols) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
code = nodesCollectColumnsFromNode(pJoin->pFullOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
code = nodesCollectColumnsFromNode(pJoin->pColOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesCollectColumnsFromNode(pJoin->pTagOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pCondCols, &pTargets);