enh: support cart in join

This commit is contained in:
dapan1121 2023-12-07 19:22:46 +08:00
parent 840690b59b
commit d7535f832a
5 changed files with 274 additions and 912 deletions

View File

@ -34,6 +34,7 @@ typedef struct SBlockOrderInfo {
#define NBIT (3u)
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
#define CharPos(r_) ((r_) >> NBIT)
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
#define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_))))

View File

@ -480,7 +480,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
}
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows) {
if (pDst->info.type != pSrc->info.type || pSrc->reassigned) {
if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) {
return TSDB_CODE_FAILED;
}
@ -489,25 +489,108 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI
}
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
memcpy(pDst->varmeta.offset, pSrc->varmeta.offset, sizeof(int32_t) * numOfRows);
if (pDst->varmeta.allocLen < pSrc->varmeta.length) {
char* tmp = taosMemoryRealloc(pDst->pData, pSrc->varmeta.length);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t allLen = 0;
if (pSrc->hasNull) {
for (int32_t i = 0; i < numOfRows; ++i) {
if (colDataIsNull_var(pSrc, srcIdx + i)) {
pDst->varmeta.offset[dstIdx + i] = -1;
pDst->hasNull = true;
continue;
}
pDst->pData = tmp;
pDst->varmeta.allocLen = pSrc->varmeta.length;
char* pData = colDataGetVarData(pSrc, srcIdx + i);
int32_t dataLen = 0;
if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
dataLen = getJsonValueLen(pData);
} else {
dataLen = varDataTLen(pData);
}
pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
allLen += dataLen;
}
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
char* pData = colDataGetVarData(pSrc, srcIdx + i);
int32_t dataLen = 0;
if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
dataLen = getJsonValueLen(pData);
} else {
dataLen = varDataTLen(pData);
}
pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
allLen += dataLen;
}
}
pDst->varmeta.length = pSrc->varmeta.length;
if (pDst->pData != NULL && pSrc->pData != NULL) {
memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length);
if (allLen > 0) {
// copy data
if (pDst->varmeta.allocLen < pDst->varmeta.length + allLen) {
char* tmp = taosMemoryRealloc(pDst->pData, pDst->varmeta.length + allLen);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pDst->pData = tmp;
pDst->varmeta.allocLen = pDst->varmeta.length + allLen;
}
memcpy(pDst->pData + pDst->varmeta.length, pSrc->pData, allLen);
pDst->varmeta.length = pDst->varmeta.length + allLen;
}
} else {
memcpy(pDst->nullbitmap, pSrc->nullbitmap, BitmapLen(numOfRows));
if (pSrc->hasNull) {
if (0 == BitPos(dstIdx) && 0 == BitPos(srcIdx)) {
memcpy(BMCharPos(pDst->nullbitmap, dstIdx), BMCharPos(pSrc->nullbitmap, srcIdx), BitmapLen(numOfRows));
if (!pDst->hasNull) {
int32_t nullBytes = BitmapLen(numOfRows);
int32_t startPos = CharPos(dstIdx);
for (int32_t i = 0; i < nullBytes; ++i) {
if (pDst->nullbitmap[startPos + i]) {
pDst->hasNull = true;
break;
}
}
}
} else if (BitPos(dstIdx) == BitPos(srcIdx)) {
for (int32_t i = 0; i < numOfRows; ++i) {
if (0 == BitPos(dstIdx)) {
memcpy(BMCharPos(pDst->nullbitmap, dstIdx + i), BMCharPos(pSrc->nullbitmap, srcIdx + i), BitmapLen(numOfRows - i));
if (!pDst->hasNull) {
int32_t nullBytes = BitmapLen(numOfRows - i);
int32_t startPos = CharPos(dstIdx + i);
for (int32_t m = 0; m < nullBytes; ++m) {
if (pDst->nullbitmap[startPos + m]) {
pDst->hasNull = true;
break;
}
}
}
break;
} else {
if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) {
colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
pDst->hasNull = true;
} else {
colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
}
}
}
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) {
colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
pDst->hasNull = true;
} else {
colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
}
}
}
} else {
memset(BMCharPos(pDst->nullbitmap, dstIdx), 0, BitmapLen(numOfRows));
}
if (pSrc->pData != NULL) {
memcpy(pDst->pData, pSrc->pData, pSrc->info.bytes * numOfRows);
memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx, pDst->info.bytes * numOfRows);
}
}

View File

@ -19,6 +19,10 @@
extern "C" {
#endif
#define MJOIN_DEFAULT_BLK_ROWS_NUM 4096
#define MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM (MJOIN_DEFAULT_BLK_ROWS_NUM * 2)
#define MJOIN_HJOIN_CART_THRESHOLD 16
typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*);
typedef enum EJoinTableType {
@ -33,25 +37,51 @@ typedef enum EJoinPhase {
E_JOIN_PHASE_DONE
} EJoinPhase;
typedef struct SMJoinColMap {
int32_t srcSlot;
int32_t dstSlot;
} SMJoinColMap;
typedef struct SMJoinColInfo {
int32_t srcSlot;
int32_t dstSlot;
bool keyCol;
bool vardata;
int32_t* offset;
int32_t bytes;
char* data;
char* bitMap;
} SMJoinColInfo;
typedef struct SMJoinCartGrp {
bool sameTsGrp;
bool firstArrIdx;
bool secondArrIdx;
int32_t firstRowIdx;
int32_t firstRowNum;
int32_t secondRowIdx;
int32_t secondRowNum;
} SMJoinCartGrp;
typedef struct SMJoinCartBlk {
SSDataBlock* pFirstBlk;
SSDataBlock* pSecondBlk;
SArray* pBlkGrps;
} SMJoinCartBlk;
typedef struct SMJoinCartCtx {
bool appendRes;
int32_t resThreshold;
SSDataBlock* pResBlk;
int32_t firstColNum;
SMJoinColInfo* pFirstCols;
SSDataBlock* pFirstBlk;
int32_t firstRowIdx;
int32_t firstRowNum;
SMJoinColMap* pFirstCols;
int32_t secondColNum;
SMJoinColInfo* pSecondCols;
SSDataBlock* pSecondBlk;
int32_t secondRowIdx;
int32_t secondRowNum;
SMJoinColMap* pSecondCols;
SArray* pCartRowIdx;
SArray* pCartBlks;
} SMJoinCartCtx;
typedef struct SMJoinBlkInfo {
@ -72,12 +102,14 @@ typedef struct SMJoinTableInfo {
int32_t blkId;
SQueryStat inputStat;
SMJoinColInfo* primCol;
SMJoinColMap* primCol;
char* primData;
int32_t finNum;
SMJoinColInfo* finCols;
SMJoinColMap* finCols;
int32_t eqNum;
SMJoinColMap* eqCols;
int32_t keyNum;
SMJoinColInfo* keyCols;
@ -117,7 +149,6 @@ typedef struct SBuildGrpResIn {
} SBuildGrpResIn;
typedef struct SBuildGrpResOut {
bool hashJoin;
SSHashObj* pGrpHash;
int32_t grpRowReadIdx;
int32_t grpRowGReadIdx;
@ -138,6 +169,7 @@ typedef struct SProbeGrpResOut {
typedef struct SGrpPairRes {
bool sameTsGrp;
bool finishGrp;
bool hashJoin;
SProbeGrpResIn probeIn;
SBuildGrpResIn buildIn;
@ -151,7 +183,9 @@ typedef struct SGrpPairRes {
#define GRP_PAIR_INIT_SIZE (sizeof(SGrpPairRes) - sizeof(bool) - sizeof(SBuildGrpResOut) - sizeof(SProbeGrpResOut))
typedef struct SMJoinOutputCtx {
bool hashCan;
int32_t grpReadIdx;
int64_t grpCurTs;
SMJoinCartCtx cartCtx;
SArray* pGrpResList;
} SMJoinOutputCtx;
@ -172,9 +206,7 @@ typedef struct SMJoinTableCtx {
} SMJoinTableCtx;
typedef struct SMJoinMergeCtx {
bool hashJoin;
EJoinPhase joinPhase;
int64_t grpCurTs;
SMJoinOutputCtx outputCtx;
SMJoinTsJoinCtx tsJoinCtx;
SMJoinTableCtx buildTbCtx;
@ -223,9 +255,10 @@ typedef struct SMJoinOperatorInfo {
SSDataBlock* pRes;
int32_t pResColNum;
int8_t* pResColMap;
SFilterInfo* pFPreFilter;
SFilterInfo* pPreFilter;
SFilterInfo* pFinFilter;
SMJoinFuncs joinFps;
SMJoinFuncs* joinFps;
SMJoinCtx ctx;
SMJoinExecInfo execInfo;
} SMJoinOperatorInfo;
@ -236,13 +269,18 @@ typedef struct SMJoinOperatorInfo {
#define START_NEW_GRP(_ctx) memset(&(_ctx)->currGrpPair, 0, GRP_PAIR_INIT_SIZE)
#define FIN_SAME_TS_GRP(_ctx, _octx, _done) do { \
if ((_ctx)->inSameTsGrp) { \
(_ctx)->currGrpPair.sameTsGrp = true; \
(_ctx)->currGrpPair.finishGrp = (_done); \
(_ctx)->inSameTsGrp = false; \
(_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \
} \
#define REACH_HJOIN_THRESHOLD(_pair) ((_pair)->buildIn.grpRowNum * (_pair)->probeIn.grpRowNum > MJOIN_HJOIN_CART_THRESHOLD)
#define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair))
#define FIN_SAME_TS_GRP(_ctx, _octx, _done) do { \
if ((_ctx)->inSameTsGrp) { \
(_ctx)->currGrpPair.sameTsGrp = true; \
(_ctx)->currGrpPair.finishGrp = (_done); \
SET_SAME_TS_GRP_HJOIN(&(_ctx)->currGrpPair, _octx); \
(_ctx)->inSameTsGrp = false; \
(_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \
} \
} while (0)
#define FIN_DIFF_TS_GRP(_ctx, _octx, _done) do { \
@ -260,6 +298,8 @@ typedef struct SMJoinOperatorInfo {
} \
} while (0)
#define CURRENT_BLK_GRP_ROWS(_in) (((_in)->grpRowNum + (_in)->grpRowBeginIdx) <= (_in)->grpBeginBlk->pBlk->info.rows ? (_in)->grpRowNum : ((_in)->grpBeginBlk->pBlk->info.rows - (_in)->grpRowBeginIdx))
#ifdef __cplusplus
}

View File

@ -211,8 +211,6 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
default:
break;
}
pInfo->joinFp = (pInfo->subType == JOIN_STYPE_ASOF || pInfo->subType == JOIN_STYPE_WIN) ? mJoinProcessWinJoin: mJoinProcessMergeJoin;
pInfo->pBuild = &pInfo->tbs[buildIdx];
pInfo->pProbe = &pInfo->tbs[probeIdx];
@ -285,9 +283,64 @@ static int32_t mJoinDoRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTbCtx
return code;
}
static void mJoinInitJoinCtx(SMJoinOperatorInfo* pJoin) {
blkFetchedFp;
static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx;
pProbeCtx->type = E_JOIN_TB_PROBE;
pBuildCtx->type = E_JOIN_TB_BUILD;
pCtx->tsJoinCtx.pProbeCtx = pProbeCtx;
pCtx->tsJoinCtx.pBuildCtx = pBuildCtx;
pCtx->joinPhase = E_JOIN_PHASE_RETRIEVE;
pCtx->outputCtx.hashCan = pProbeCtx->pTbInfo->eqNum > 0;
pCtx->outputCtx.pGrpResList = taosArrayInit(MJOIN_DEFAULT_BLK_ROWS_NUM, sizeof(SGrpPairRes));
if (NULL == pCtx->outputCtx.pGrpResList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pJoin->pFPreFilter) {
pCtx->outputCtx.cartCtx.pResBlk = createOneDataBlock(pJoin->pRes);
blockDataEnsureCapacity(pCtx->outputCtx.cartCtx.pResBlk, MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM);
pCtx->outputCtx.cartCtx.resThreshold = MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM * 0.75;
} else {
pCtx->outputCtx.cartCtx.pResBlk = pJoin->pRes;
pCtx->outputCtx.cartCtx.resThreshold = pOperator->resultInfo.threshold;
}
if (!pCtx->outputCtx.hashCan && NULL == pJoin->pFPreFilter) {
pCtx->outputCtx.cartCtx.appendRes = true;
}
pCtx->outputCtx.cartCtx.firstColNum = pProbeCtx->pTbInfo->finNum;
pCtx->outputCtx.cartCtx.pFirstCols = pProbeCtx->pTbInfo->finCols;
pCtx->outputCtx.cartCtx.secondColNum = pBuildCtx->pTbInfo->finNum;
pCtx->outputCtx.cartCtx.pSecondCols = pBuildCtx->pTbInfo->finCols;
pCtx->outputCtx.cartCtx.pCartRowIdx = taosArrayInit(MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM, sizeof(int32_t));
if (NULL == pCtx->outputCtx.cartCtx.pCartRowIdx) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCtx->outputCtx.cartCtx.pCartGrps = taosArrayInit(MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM, sizeof(SMJoinCartGrp));
if (NULL == pCtx->outputCtx.cartCtx.pCartGrps) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinInitCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
pJoin->joinFps = &gMJoinFps[pJoin->joinType][pJoin->subType];
int32_t code = (*pJoin->joinFps->initJoinCtx)(pOperator, pJoin);
if (code) {
return code;
}
return TSDB_CODE_SUCCESS;
}
static void mJoinSetDone(SOperatorInfo* pOperator) {
@ -512,6 +565,7 @@ static bool mLeftJoinSplitGrpImpl(SOperatorInfo* pOperator, SMJoinOperatorInfo*
if (pCtx->pLastGrpPair && pCtx->pLastGrpPair->sameTsGrp
&& pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->pLastGrpPair->probeIn.grpLastTs) {
pCtx->pLastGrpPair->probeIn.grpRowNum++;
SET_SAME_TS_GRP_HJOIN(pCtx->pLastGrpPair, pOutCtx);
continue;
}
for (; pBuildCtx->blkIdx < pBuildCtx->blkNum; mJoinBuildMoveToNextBlk(pJoin, pCtx, pBuildCtx, pProbeCtx)) {
@ -612,9 +666,10 @@ static bool mLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi
}
static void mLeftJoinCart(SMJoinCartCtx* pCtx) {
int32_t currRows = pCtx->pResBlk->info.rows;
int32_t currRows = pCtx->appendRes ? pCtx->pResBlk->info.rows : 0;
for (int32_t c = 0; c < pCtx->firstColNum; ++c) {
SMJoinColInfo* pFirstCol = pCtx->pFirstCols + c;
SMJoinColMap* pFirstCol = pCtx->pFirstCols + c;
SColumnInfoData* pInCol = taosArrayGet(pCtx->pFirstBlk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pFirstCol->dstSlot);
for (int32_t r = 0; r < pCtx->firstRowNum; ++r) {
@ -625,6 +680,15 @@ static void mLeftJoinCart(SMJoinCartCtx* pCtx) {
}
}
}
for (int32_t c = 0; c < pCtx->secondColNum; ++c) {
SMJoinColMap* pSecondCol = pCtx->pSecondCols + c;
SColumnInfoData* pInCol = taosArrayGet(pCtx->pSecondBlk->pDataBlock, pSecondCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < pCtx->firstRowNum; ++r) {
colDataAssignNRows(pOutCol, currRows + r * pCtx->secondRowNum, pInCol, pCtx->secondRowIdx, pCtx->secondRowNum);
}
}
}
static bool mLeftJoinSameTsOutput(SMJoinOperatorInfo* pJoin, SGrpPairRes* pPair) {
@ -651,10 +715,42 @@ static bool mLeftJoinSameTsOutput(SMJoinOperatorInfo* pJoin, SGrpPairRes* pPair)
} while (true);
}
static bool mLeftJoinCartOutput(SMJoinOperatorInfo* pJoin, SMJoinOutputCtx* pCtx) {
bool contLoop = false;
SMJoinCartCtx* pCart = &pCtx->cartCtx;
int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList);
int32_t rowsLeft = pCart->pResBlk pCart->pResBlk->info.rows;
for (; pCtx->grpReadIdx < grpNum; pCtx->grpReadIdx++) {
SGrpPairRes* pPair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx);
if (!pPair->finishGrp) {
ASSERTS(pCtx->grpReadIdx == grpNum - 1, "unfinished grp not the last");
taosArrayRemoveBatch(pCtx->pGrpResList, 0, pCtx->grpReadIdx - 1, NULL);
pCtx->grpReadIdx = 0;
break;
}
if (pPair->hashJoin) {
contLoop = mLeftJoinHashOutput(pJoin, pPair);
} else if (pCtx->cartCtx.appendRes) {
contLoop = mLeftJoinDirectOutput(pJoin, pPair);
}
if (!contLoop) {
return false;
}
}
}
static bool mLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
SMJoinOutputCtx* pCtx = &pJoin->ctx.mergeCtx.outputCtx;
bool contLoop = false;
int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList);
if (pCtx->cartCtx.appendRes) {
return mLeftJoinCartOutput(pJoin, pCtx);
}
for (; pCtx->grpReadIdx < grpNum; pCtx->grpReadIdx++) {
SGrpPairRes* pPair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx);
@ -665,9 +761,9 @@ static bool mLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin)
break;
}
if (pPair->sameTsGrp) {
contLoop = mLeftJoinSameTsOutput(pJoin, pPair);
} else {
if (pPair->hashJoin) {
contLoop = mLeftJoinHashOutput(pJoin, pPair);
} else if (pCtx->cartCtx.appendRes) {
contLoop = mLeftJoinDirectOutput(pJoin, pPair);
}
@ -703,23 +799,6 @@ static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo
}
static SSDataBlock* mJoinHanleMergeJoin(SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx;
SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx;
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = NULL;
while (true) {
pBlock = (*pJoin->joinFp)(pOperator, pJoin, pCtx, pBuildCtx, pProbeCtx);
if (pBlock && pBlock->info.rows > 0) {
return pBlock;
}
}
}
SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
@ -788,7 +867,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
int32_t numOfCols = 0;
pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 4096);
initResultSizeInfo(&pOperator->resultInfo, MJOIN_DEFAULT_BLK_ROWS_NUM);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
@ -798,7 +877,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
mJoinSetBuildAndProbeTable(pInfo, pJoinNode);
mJoinInitJoinCtx(pInfo);
mJoinInitCtx(pOperator, pInfo);
code = mJoinBuildResColMap(pInfo, pJoinNode);
if (code) {
@ -810,6 +889,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error;
}
if (pJoinNode->pFullOnCond != NULL) {
code = filterInitFromNode(pJoinNode->pFullOnCond, &pInfo->pFPreFilter, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
if (pJoinNode->pColOnCond != NULL) {
code = filterInitFromNode(pJoinNode->pColOnCond, &pInfo->pPreFilter, 0);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -1,848 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "operator.h"
#include "os.h"
#include "querynodes.h"
#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
typedef struct SMJoinRowCtx {
bool rowRemains;
int64_t ts;
SArray* leftRowLocations;
SArray* leftCreatedBlocks;
SArray* rightCreatedBlocks;
int32_t leftRowIdx;
int32_t rightRowIdx;
bool rightUseBuildTable;
SArray* rightRowLocations;
} SMJoinRowCtx;
typedef struct SMJoinOperatorInfo {
SSDataBlock* pRes;
int32_t joinType;
int32_t inputOrder;
bool downstreamInitDone[2];
bool downstreamFetchDone[2];
int16_t downstreamResBlkId[2];
SSDataBlock* pLeft;
int32_t leftPos;
SColumnInfo leftCol;
SSDataBlock* pRight;
int32_t rightPos;
SColumnInfo rightCol;
SNode* pCondAfterMerge;
SNode* pColEqualOnConditions;
SArray* leftEqOnCondCols;
char* leftEqOnCondKeyBuf;
int32_t leftEqOnCondKeyLen;
SArray* rightEqOnCondCols;
char* rightEqOnCondKeyBuf;
int32_t rightEqOnCondKeyLen;
SSHashObj* rightBuildTable;
SMJoinRowCtx rowCtx;
int64_t resRows;
} SMJoinOperatorInfo;
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
static void destroyMergeJoinOperator(void* param);
static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr);
static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) {
SNode* pPrimKeyCond = pJoinNode->pPrimKeyCond;
if (nodeType(pPrimKeyCond) != QUERY_NODE_OPERATOR) {
qError("not support this in join operator, %s", idStr);
return; // do not handle this
}
SOperatorNode* pNode = (SOperatorNode*)pPrimKeyCond;
SColumnNode* col1 = (SColumnNode*)pNode->pLeft;
SColumnNode* col2 = (SColumnNode*)pNode->pRight;
SColumnNode* leftTsCol = NULL;
SColumnNode* rightTsCol = NULL;
if (col1->dataBlockId == col2->dataBlockId) {
leftTsCol = col1;
rightTsCol = col2;
} else {
if (col1->dataBlockId == pInfo->downstreamResBlkId[0]) {
ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[1]);
leftTsCol = col1;
rightTsCol = col2;
} else {
ASSERT(col1->dataBlockId == pInfo->downstreamResBlkId[1]);
ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[0]);
leftTsCol = col2;
rightTsCol = col1;
}
}
setJoinColumnInfo(&pInfo->leftCol, leftTsCol);
setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
}
static void extractEqualOnCondColsFromOper(SMJoinOperatorInfo* pInfo, SOperatorNode* pOperNode,
SColumn* pLeft, SColumn* pRight) {
SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft;
SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight;
if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pInfo->downstreamResBlkId[0]) {
*pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft);
*pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight);
} else {
*pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight);
*pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft);
}
}
static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SNode* pEqualOnCondNode,
SArray* leftTagEqCols, SArray* rightTagEqCols) {
SColumn left = {0};
SColumn right = {0};
if (nodeType(pEqualOnCondNode) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)pEqualOnCondNode)->condType == LOGIC_COND_TYPE_AND) {
SNode* pNode = NULL;
FOREACH(pNode, ((SLogicConditionNode*)pEqualOnCondNode)->pParameterList) {
SOperatorNode* pOperNode = (SOperatorNode*)pNode;
extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right);
taosArrayPush(leftTagEqCols, &left);
taosArrayPush(rightTagEqCols, &right);
}
return;
}
if (nodeType(pEqualOnCondNode) == QUERY_NODE_OPERATOR) {
SOperatorNode* pOperNode = (SOperatorNode*)pEqualOnCondNode;
extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right);
taosArrayPush(leftTagEqCols, &left);
taosArrayPush(rightTagEqCols, &right);
}
}
static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
(*keyLen) += pCol->bytes; // actual data + null_flag
}
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
(*keyLen) += nullFlagSize;
if (*keyLen >= 0) {
(*keyBuf) = taosMemoryCalloc(1, (*keyLen));
if ((*keyBuf) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t rowIndex, void* pKey) {
SColumnDataAgg* pColAgg = NULL;
size_t numOfGroupCols = taosArrayGetSize(pCols);
char* isNull = (char*)pKey;
char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = (SColumn*) taosArrayGet(pCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
// valid range check. todo: return error code.
if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) {
continue;
}
if (pBlock->pBlockAgg != NULL) {
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
isNull[i] = 1;
} else {
isNull[i] = 0;
char* val = colDataGetData(pColInfoData, rowIndex);
if (pCol->type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(val);
memcpy(pStart, val, dataLen);
pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
varDataCopy(pStart, val);
pStart += varDataTLen(val);
} else {
memcpy(pStart, val, pCol->bytes);
pStart += pCol->bytes;
}
}
}
return (int32_t)(pStart - (char*)pKey);
}
SOperatorInfo** buildMergeJoinDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) {
SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES);
if (p) {
p[0] = pDownstream[0];
p[1] = pDownstream[0];
pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(p[0], 0);
pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(p[1], 1);
}
return p;
}
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
bool newDownstreams = false;
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (1 == numOfDownstream) {
newDownstreams = true;
pDownstream = buildMergeJoinDownstreams(pInfo, pDownstream);
if (NULL == pDownstream) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
numOfDownstream = 2;
} else {
pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0);
pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 0);
}
int32_t numOfCols = 0;
pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
extractTimeCondition(pInfo, pJoinNode, GET_TASKID(pTaskInfo));
if (pJoinNode->pFullOnCond != NULL && pJoinNode->node.pConditions != NULL) {
pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (pInfo->pCondAfterMerge == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge);
pLogicCond->pParameterList = nodesMakeList();
if (pLogicCond->pParameterList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pFullOnCond));
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
pLogicCond->condType = LOGIC_COND_TYPE_AND;
} else if (pJoinNode->pFullOnCond != NULL) {
pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pFullOnCond);
} else if (pJoinNode->pColEqCond != NULL) {
pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pColEqCond);
} else if (pJoinNode->node.pConditions != NULL) {
pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions);
} else {
pInfo->pCondAfterMerge = NULL;
}
code = filterInitFromNode(pInfo->pCondAfterMerge, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->inputOrder = TSDB_ORDER_ASC;
if (pJoinNode->node.inputTsOrder == ORDER_ASC) {
pInfo->inputOrder = TSDB_ORDER_ASC;
} else if (pJoinNode->node.inputTsOrder == ORDER_DESC) {
pInfo->inputOrder = TSDB_ORDER_DESC;
}
pInfo->pColEqualOnConditions = pJoinNode->pColEqCond;
if (pInfo->pColEqualOnConditions != NULL) {
pInfo->leftEqOnCondCols = taosArrayInit(4, sizeof(SColumn));
pInfo->rightEqOnCondCols = taosArrayInit(4, sizeof(SColumn));
extractEqualOnCondCols(pInfo, pInfo->pColEqualOnConditions, pInfo->leftEqOnCondCols, pInfo->rightEqOnCondCols);
initTagColskeyBuf(&pInfo->leftEqOnCondKeyLen, &pInfo->leftEqOnCondKeyBuf, pInfo->leftEqOnCondCols);
initTagColskeyBuf(&pInfo->rightEqOnCondKeyLen, &pInfo->rightEqOnCondKeyBuf, pInfo->rightEqOnCondCols);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->rightBuildTable = tSimpleHashInit(256, hashFn);
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (newDownstreams) {
taosMemoryFree(pDownstream);
}
pOperator->numOfRealDownstream = newDownstreams ? 1 : 2;
return pOperator;
_error:
if (pInfo != NULL) {
destroyMergeJoinOperator(pInfo);
}
if (newDownstreams) {
taosMemoryFree(pDownstream);
}
taosMemoryFree(pOperator);
pTaskInfo->code = code;
return NULL;
}
void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
pColumn->slotId = pColumnNode->slotId;
pColumn->type = pColumnNode->node.resType.type;
pColumn->bytes = pColumnNode->node.resType.bytes;
pColumn->precision = pColumnNode->node.resType.precision;
pColumn->scale = pColumnNode->node.resType.scale;
}
static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) {
void* p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) {
SArray* rows = (*(SArray**)p);
taosArrayDestroy(rows);
}
tSimpleHashCleanup(pBuildTable);
}
void destroyMergeJoinOperator(void* param) {
SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param;
if (pJoinOperator->pColEqualOnConditions != NULL) {
mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable);
taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf);
taosArrayDestroy(pJoinOperator->rightEqOnCondCols);
taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf);
taosArrayDestroy(pJoinOperator->leftEqOnCondCols);
}
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks);
taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks);
taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations);
taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations);
pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
taosMemoryFreeClear(param);
}
static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock,
int32_t rightPos) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
int32_t rowIndex = -1;
SColumnInfoData* pSrc = NULL;
if (pLeftBlock->info.id.blockId == blockId) {
pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
rowIndex = leftPos;
} else {
pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId);
rowIndex = rightPos;
}
if (colDataIsNull_s(pSrc, rowIndex)) {
colDataSetNULL(pDst, currRow);
} else {
char* p = colDataGetData(pSrc, rowIndex);
colDataSetVal(pDst, currRow, p, false);
}
}
}
typedef struct SRowLocation {
SSDataBlock* pDataBlock;
int32_t pos;
} SRowLocation;
// pBlock[tsSlotId][startPos, endPos) == timestamp,
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp,
int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) {
int32_t numRows = pBlock->info.rows;
ASSERT(startPos < numRows);
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
int32_t i = startPos;
for (; i < numRows; ++i) {
char* pNextVal = colDataGetData(pCol, i);
if (timestamp != *(int64_t*)pNextVal) {
break;
}
}
int32_t endPos = i;
*pEndPos = endPos;
if (endPos - startPos == 0) {
return 0;
}
SSDataBlock* block = pBlock;
bool createdNewBlock = false;
if (endPos == numRows) {
block = blockDataExtractBlock(pBlock, startPos, endPos - startPos);
taosArrayPush(createdBlocks, &block);
createdNewBlock = true;
}
SRowLocation location = {0};
for (int32_t j = startPos; j < endPos; ++j) {
location.pDataBlock = block;
location.pos = (createdNewBlock ? j - startPos : j);
taosArrayPush(rowLocations, &location);
}
return 0;
}
// whichChild == 0, left child of join; whichChild ==1, right child of join
static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId,
SSDataBlock* startDataBlock, int32_t startPos,
int64_t timestamp, SArray* rowLocations,
SArray* createdBlocks) {
ASSERT(whichChild == 0 || whichChild == 1);
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
int32_t endPos = -1;
SSDataBlock* dataBlock = startDataBlock;
mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks);
while (endPos == dataBlock->info.rows) {
SOperatorInfo* ds = pOperator->pDownstream[whichChild];
dataBlock = getNextBlockFromDownstreamRemain(pOperator, whichChild);
qError("merge join %s got block for same ts, rows:%" PRId64, whichChild == 0 ? "left" : "right", dataBlock ? dataBlock->info.rows : 0);
if (whichChild == 0) {
pJoinInfo->leftPos = 0;
pJoinInfo->pLeft = dataBlock;
} else if (whichChild == 1) {
pJoinInfo->rightPos = 0;
pJoinInfo->pRight = dataBlock;
}
if (dataBlock == NULL) {
pJoinInfo->downstreamFetchDone[whichChild] = true;
endPos = -1;
break;
}
mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks);
}
if (endPos != -1) {
if (whichChild == 0) {
pJoinInfo->leftPos = endPos;
} else if (whichChild == 1) {
pJoinInfo->rightPos = endPos;
}
}
return 0;
}
static int32_t mergeJoinFillBuildTable(SMJoinOperatorInfo* pInfo, SArray* rightRowLocations) {
for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) {
SRowLocation* rightRow = taosArrayGet(rightRowLocations, i);
int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightEqOnCondCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightEqOnCondKeyBuf);
SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen);
if (!ppRows) {
SArray* rows = taosArrayInit(4, sizeof(SRowLocation));
taosArrayPush(rows, rightRow);
tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen, &rows, POINTER_BYTES);
} else {
taosArrayPush(*ppRows, rightRow);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows,
const SArray* leftRowLocations, int32_t leftRowIdx,
int32_t rightRowIdx, bool useBuildTableTSRange, SArray* rightRowLocations, bool* pReachThreshold) {
*pReachThreshold = false;
uint32_t limitRowNum = pOperator->resultInfo.threshold;
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
int32_t i,j;
for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SArray* pRightRows = NULL;
if (useBuildTableTSRange) {
int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftEqOnCondCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftEqOnCondKeyBuf);
SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftEqOnCondKeyBuf, keyLen);
if (!ppRightRows) {
continue;
}
pRightRows = *ppRightRows;
} else {
pRightRows = rightRowLocations;
}
size_t rightRowsSize = taosArrayGetSize(pRightRows);
for (j = rightRowIdx; j < rightRowsSize; ++j) {
if (*nRows >= limitRowNum) {
*pReachThreshold = true;
break;
}
SRowLocation* rightRow = taosArrayGet(pRightRows, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos);
++*nRows;
}
if (*pReachThreshold) {
break;
}
}
if (*pReachThreshold) {
pJoinInfo->rowCtx.rowRemains = true;
pJoinInfo->rowCtx.leftRowIdx = i;
pJoinInfo->rowCtx.rightRowIdx = j;
}
return TSDB_CODE_SUCCESS;
}
static void mergeJoinDestroyTSRangeCtx(SMJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks,
SArray* rightCreatedBlocks, bool rightUseBuildTable, SArray* rightRowLocations) {
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(rightCreatedBlocks);
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
blockDataDestroy(pBlock);
}
if (rightRowLocations != NULL) {
taosArrayDestroy(rightRowLocations);
}
if (rightUseBuildTable) {
void* p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pJoinInfo->rightBuildTable, p, &iter)) != NULL) {
SArray* rows = (*(SArray**)p);
taosArrayDestroy(rows);
}
tSimpleHashClear(pJoinInfo->rightBuildTable);
}
taosArrayDestroy(leftCreatedBlocks);
taosArrayDestroy(leftRowLocations);
pJoinInfo->rowCtx.rowRemains = false;
pJoinInfo->rowCtx.leftRowLocations = NULL;
pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
pJoinInfo->rowCtx.rightUseBuildTable = false;
pJoinInfo->rowCtx.rightRowLocations = NULL;
}
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
int32_t* nRows) {
int32_t code = TSDB_CODE_SUCCESS;
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
SArray* leftRowLocations = NULL;
SArray* rightRowLocations = NULL;
SArray* leftCreatedBlocks = NULL;
SArray* rightCreatedBlocks = NULL;
int32_t leftRowIdx = 0;
int32_t rightRowIdx = 0;
SSHashObj* rightTableHash = NULL;
bool rightUseBuildTable = false;
if (pJoinInfo->rowCtx.rowRemains) {
leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
rightUseBuildTable = pJoinInfo->rowCtx.rightUseBuildTable;
rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
rightRowIdx = pJoinInfo->rowCtx.rightRowIdx;
} else {
leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
if (pJoinInfo->pColEqualOnConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) {
mergeJoinFillBuildTable(pJoinInfo, rightRowLocations);
rightUseBuildTable = true;
taosArrayDestroy(rightRowLocations);
rightRowLocations = NULL;
}
}
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.threshold);
if (code != TSDB_CODE_SUCCESS) {
qError("%s can not ensure block capacity for join. left: %zu", GET_TASKID(pOperator->pTaskInfo),
leftNumJoin);
}
bool reachThreshold = false;
if (code == TSDB_CODE_SUCCESS) {
mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx,
rightRowIdx, rightUseBuildTable, rightRowLocations, &reachThreshold);
}
if (!reachThreshold) {
mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks,
rightUseBuildTable, rightRowLocations);
} else {
pJoinInfo->rowCtx.rowRemains = true;
pJoinInfo->rowCtx.ts = timestamp;
pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
pJoinInfo->rowCtx.rightUseBuildTable = rightUseBuildTable;
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
}
return TSDB_CODE_SUCCESS;
}
static void setMergeJoinDone(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
if (pOperator->pDownstreamGetParams) {
freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM);
freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM);
pOperator->pDownstreamGetParams[0] = NULL;
pOperator->pDownstreamGetParams[1] = NULL;
}
}
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
bool leftEmpty = false;
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
if (!pJoinInfo->downstreamFetchDone[0]) {
pJoinInfo->pLeft = getNextBlockFromDownstreamRemain(pOperator, 0);
pJoinInfo->downstreamInitDone[0] = true;
pJoinInfo->leftPos = 0;
qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0);
} else {
pJoinInfo->pLeft = NULL;
}
if (pJoinInfo->pLeft == NULL) {
if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstream && !pJoinInfo->downstreamInitDone[1]) {
leftEmpty = true;
} else {
setMergeJoinDone(pOperator);
return false;
}
}
}
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
if (!pJoinInfo->downstreamFetchDone[1]) {
pJoinInfo->pRight = getNextBlockFromDownstreamRemain(pOperator, 1);
pJoinInfo->downstreamInitDone[1] = true;
pJoinInfo->rightPos = 0;
qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0);
} else {
pJoinInfo->pRight = NULL;
}
if (pJoinInfo->pRight == NULL) {
setMergeJoinDone(pOperator);
return false;
} else {
if (leftEmpty) {
setMergeJoinDone(pOperator);
return false;
}
}
}
if (NULL == pJoinInfo->pLeft || NULL == pJoinInfo->pRight) {
setMergeJoinDone(pOperator);
return false;
}
// only the timestamp match support for ordinary table
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
*pLeftTs = *(int64_t*)pLeftVal;
SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
*pRightTs = *(int64_t*)pRightVal;
return true;
}
static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
int32_t nrows = pRes->info.rows;
bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false;
while (1) {
int64_t leftTs = 0;
int64_t rightTs = 0;
if (pJoinInfo->rowCtx.rowRemains) {
leftTs = pJoinInfo->rowCtx.ts;
rightTs = pJoinInfo->rowCtx.ts;
} else {
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
if (!hasNextTs) {
break;
}
}
if (leftTs == rightTs) {
mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
} else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
pJoinInfo->leftPos += 1;
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
}
} else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
pJoinInfo->rightPos += 1;
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
}
}
// the pDataBlock are always the same one, no need to call this again
pRes->info.rows = nrows;
pRes->info.dataLoad = 1;
pRes->info.scanFlag = MAIN_SCAN;
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
}
}
void resetMergeJoinOperator(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
if (pJoinInfo->rowCtx.rowRemains) {
mergeJoinDestroyTSRangeCtx(pJoinInfo, pJoinInfo->rowCtx.leftRowLocations, pJoinInfo->rowCtx.leftCreatedBlocks, pJoinInfo->rowCtx.rightCreatedBlocks,
pJoinInfo->rowCtx.rightUseBuildTable, pJoinInfo->rowCtx.rightRowLocations);
}
pJoinInfo->pLeft = NULL;
pJoinInfo->leftPos = 0;
pJoinInfo->pRight = NULL;
pJoinInfo->rightPos = 0;
pJoinInfo->downstreamFetchDone[0] = false;
pJoinInfo->downstreamFetchDone[1] = false;
pJoinInfo->downstreamInitDone[0] = false;
pJoinInfo->downstreamInitDone[1] = false;
pJoinInfo->resRows = 0;
pOperator->status = OP_OPENED;
}
SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) {
qError("total merge join res rows:%" PRId64, pJoinInfo->resRows);
return NULL;
} else {
resetMergeJoinOperator(pOperator);
qError("start new merge join");
}
}
int64_t st = 0;
if (pOperator->cost.openCost == 0) {
st = taosGetTimestampUs();
}
SSDataBlock* pRes = pJoinInfo->pRes;
blockDataCleanup(pRes);
while (true) {
int32_t numOfRowsBefore = pRes->info.rows;
doMergeJoinImpl(pOperator, pRes);
int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore;
if (numOfNewRows == 0) {
break;
}
if (pOperator->exprSupp.pFilterInfo != NULL) {
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
if (pOperator->status == OP_EXEC_DONE) {
break;
}
}
if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
if (pRes->info.rows > 0) {
pJoinInfo->resRows += pRes->info.rows;
qError("merge join returns res rows:%" PRId64, pRes->info.rows);
return pRes;
} else {
qError("total merge join res rows:%" PRId64, pJoinInfo->resRows);
return NULL;
}
}