From d7535f832a34fb35ab72d7a8c79b7368ecaa061e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 7 Dec 2023 19:22:46 +0800 Subject: [PATCH] enh: support cart in join --- include/common/tdatablock.h | 1 + source/common/src/tdatablock.c | 111 ++- source/libs/executor/inc/mergejoin.h | 84 +- source/libs/executor/src/mergejoinoperator.c | 142 ++- .../libs/executor/src/mergejoinoperator_old.c | 848 ------------------ 5 files changed, 274 insertions(+), 912 deletions(-) delete mode 100755 source/libs/executor/src/mergejoinoperator_old.c diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index b2aff6fd7e..221dc3cd1f 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -34,6 +34,7 @@ typedef struct SBlockOrderInfo { #define NBIT (3u) #define BitPos(_n) ((_n) & ((1 << NBIT) - 1)) +#define CharPos(r_) ((r_) >> NBIT) #define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT]) #define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_)))) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 87d49cd902..d898c69eb0 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -480,7 +480,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p } int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows) { - if (pDst->info.type != pSrc->info.type || pSrc->reassigned) { + if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) { return TSDB_CODE_FAILED; } @@ -489,25 +489,108 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI } if (IS_VAR_DATA_TYPE(pDst->info.type)) { - memcpy(pDst->varmeta.offset, pSrc->varmeta.offset, sizeof(int32_t) * numOfRows); - if (pDst->varmeta.allocLen < pSrc->varmeta.length) { - char* tmp = taosMemoryRealloc(pDst->pData, pSrc->varmeta.length); - if (tmp == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } + int32_t allLen = 0; + if (pSrc->hasNull) { + for (int32_t i = 0; i < numOfRows; ++i) { + if (colDataIsNull_var(pSrc, srcIdx + i)) { + pDst->varmeta.offset[dstIdx + i] = -1; + pDst->hasNull = true; + continue; + } - pDst->pData = tmp; - pDst->varmeta.allocLen = pSrc->varmeta.length; + char* pData = colDataGetVarData(pSrc, srcIdx + i); + int32_t dataLen = 0; + if (pSrc->info.type == TSDB_DATA_TYPE_JSON) { + dataLen = getJsonValueLen(pData); + } else { + dataLen = varDataTLen(pData); + } + pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen; + allLen += dataLen; + } + } else { + for (int32_t i = 0; i < numOfRows; ++i) { + char* pData = colDataGetVarData(pSrc, srcIdx + i); + int32_t dataLen = 0; + if (pSrc->info.type == TSDB_DATA_TYPE_JSON) { + dataLen = getJsonValueLen(pData); + } else { + dataLen = varDataTLen(pData); + } + pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen; + allLen += dataLen; + } } - pDst->varmeta.length = pSrc->varmeta.length; - if (pDst->pData != NULL && pSrc->pData != NULL) { - memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length); + if (allLen > 0) { + // copy data + if (pDst->varmeta.allocLen < pDst->varmeta.length + allLen) { + char* tmp = taosMemoryRealloc(pDst->pData, pDst->varmeta.length + allLen); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pDst->pData = tmp; + pDst->varmeta.allocLen = pDst->varmeta.length + allLen; + } + + memcpy(pDst->pData + pDst->varmeta.length, pSrc->pData, allLen); + pDst->varmeta.length = pDst->varmeta.length + allLen; } } else { - memcpy(pDst->nullbitmap, pSrc->nullbitmap, BitmapLen(numOfRows)); + if (pSrc->hasNull) { + if (0 == BitPos(dstIdx) && 0 == BitPos(srcIdx)) { + memcpy(BMCharPos(pDst->nullbitmap, dstIdx), BMCharPos(pSrc->nullbitmap, srcIdx), BitmapLen(numOfRows)); + if (!pDst->hasNull) { + int32_t nullBytes = BitmapLen(numOfRows); + int32_t startPos = CharPos(dstIdx); + for (int32_t i = 0; i < nullBytes; ++i) { + if (pDst->nullbitmap[startPos + i]) { + pDst->hasNull = true; + break; + } + } + } + } else if (BitPos(dstIdx) == BitPos(srcIdx)) { + for (int32_t i = 0; i < numOfRows; ++i) { + if (0 == BitPos(dstIdx)) { + memcpy(BMCharPos(pDst->nullbitmap, dstIdx + i), BMCharPos(pSrc->nullbitmap, srcIdx + i), BitmapLen(numOfRows - i)); + if (!pDst->hasNull) { + int32_t nullBytes = BitmapLen(numOfRows - i); + int32_t startPos = CharPos(dstIdx + i); + for (int32_t m = 0; m < nullBytes; ++m) { + if (pDst->nullbitmap[startPos + m]) { + pDst->hasNull = true; + break; + } + } + } + break; + } else { + if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) { + colDataSetNull_f(pDst->nullbitmap, dstIdx + i); + pDst->hasNull = true; + } else { + colDataClearNull_f(pDst->nullbitmap, dstIdx + i); + } + } + } + } else { + for (int32_t i = 0; i < numOfRows; ++i) { + if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) { + colDataSetNull_f(pDst->nullbitmap, dstIdx + i); + pDst->hasNull = true; + } else { + colDataClearNull_f(pDst->nullbitmap, dstIdx + i); + } + } + } + } else { + memset(BMCharPos(pDst->nullbitmap, dstIdx), 0, BitmapLen(numOfRows)); + } + if (pSrc->pData != NULL) { - memcpy(pDst->pData, pSrc->pData, pSrc->info.bytes * numOfRows); + memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx, pDst->info.bytes * numOfRows); } } diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 8ee5bf8301..918df31f00 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -19,6 +19,10 @@ extern "C" { #endif +#define MJOIN_DEFAULT_BLK_ROWS_NUM 4096 +#define MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM (MJOIN_DEFAULT_BLK_ROWS_NUM * 2) +#define MJOIN_HJOIN_CART_THRESHOLD 16 + typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*); typedef enum EJoinTableType { @@ -33,25 +37,51 @@ typedef enum EJoinPhase { E_JOIN_PHASE_DONE } EJoinPhase; +typedef struct SMJoinColMap { + int32_t srcSlot; + int32_t dstSlot; +} SMJoinColMap; + typedef struct SMJoinColInfo { int32_t srcSlot; int32_t dstSlot; + bool keyCol; + bool vardata; + int32_t* offset; + int32_t bytes; + char* data; + char* bitMap; } SMJoinColInfo; +typedef struct SMJoinCartGrp { + bool sameTsGrp; + bool firstArrIdx; + bool secondArrIdx; + int32_t firstRowIdx; + int32_t firstRowNum; + int32_t secondRowIdx; + int32_t secondRowNum; +} SMJoinCartGrp; + +typedef struct SMJoinCartBlk { + SSDataBlock* pFirstBlk; + SSDataBlock* pSecondBlk; + SArray* pBlkGrps; +} SMJoinCartBlk; + + typedef struct SMJoinCartCtx { + bool appendRes; + int32_t resThreshold; SSDataBlock* pResBlk; int32_t firstColNum; - SMJoinColInfo* pFirstCols; - SSDataBlock* pFirstBlk; - int32_t firstRowIdx; - int32_t firstRowNum; - + SMJoinColMap* pFirstCols; int32_t secondColNum; - SMJoinColInfo* pSecondCols; - SSDataBlock* pSecondBlk; - int32_t secondRowIdx; - int32_t secondRowNum; + SMJoinColMap* pSecondCols; + + SArray* pCartRowIdx; + SArray* pCartBlks; } SMJoinCartCtx; typedef struct SMJoinBlkInfo { @@ -72,12 +102,14 @@ typedef struct SMJoinTableInfo { int32_t blkId; SQueryStat inputStat; - SMJoinColInfo* primCol; + SMJoinColMap* primCol; char* primData; int32_t finNum; - SMJoinColInfo* finCols; + SMJoinColMap* finCols; + int32_t eqNum; + SMJoinColMap* eqCols; int32_t keyNum; SMJoinColInfo* keyCols; @@ -117,7 +149,6 @@ typedef struct SBuildGrpResIn { } SBuildGrpResIn; typedef struct SBuildGrpResOut { - bool hashJoin; SSHashObj* pGrpHash; int32_t grpRowReadIdx; int32_t grpRowGReadIdx; @@ -138,6 +169,7 @@ typedef struct SProbeGrpResOut { typedef struct SGrpPairRes { bool sameTsGrp; bool finishGrp; + bool hashJoin; SProbeGrpResIn probeIn; SBuildGrpResIn buildIn; @@ -151,7 +183,9 @@ typedef struct SGrpPairRes { #define GRP_PAIR_INIT_SIZE (sizeof(SGrpPairRes) - sizeof(bool) - sizeof(SBuildGrpResOut) - sizeof(SProbeGrpResOut)) typedef struct SMJoinOutputCtx { + bool hashCan; int32_t grpReadIdx; + int64_t grpCurTs; SMJoinCartCtx cartCtx; SArray* pGrpResList; } SMJoinOutputCtx; @@ -172,9 +206,7 @@ typedef struct SMJoinTableCtx { } SMJoinTableCtx; typedef struct SMJoinMergeCtx { - bool hashJoin; EJoinPhase joinPhase; - int64_t grpCurTs; SMJoinOutputCtx outputCtx; SMJoinTsJoinCtx tsJoinCtx; SMJoinTableCtx buildTbCtx; @@ -223,9 +255,10 @@ typedef struct SMJoinOperatorInfo { SSDataBlock* pRes; int32_t pResColNum; int8_t* pResColMap; + SFilterInfo* pFPreFilter; SFilterInfo* pPreFilter; SFilterInfo* pFinFilter; - SMJoinFuncs joinFps; + SMJoinFuncs* joinFps; SMJoinCtx ctx; SMJoinExecInfo execInfo; } SMJoinOperatorInfo; @@ -236,13 +269,18 @@ typedef struct SMJoinOperatorInfo { #define START_NEW_GRP(_ctx) memset(&(_ctx)->currGrpPair, 0, GRP_PAIR_INIT_SIZE) -#define FIN_SAME_TS_GRP(_ctx, _octx, _done) do { \ - if ((_ctx)->inSameTsGrp) { \ - (_ctx)->currGrpPair.sameTsGrp = true; \ - (_ctx)->currGrpPair.finishGrp = (_done); \ - (_ctx)->inSameTsGrp = false; \ - (_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \ - } \ +#define REACH_HJOIN_THRESHOLD(_pair) ((_pair)->buildIn.grpRowNum * (_pair)->probeIn.grpRowNum > MJOIN_HJOIN_CART_THRESHOLD) + +#define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair)) + +#define FIN_SAME_TS_GRP(_ctx, _octx, _done) do { \ + if ((_ctx)->inSameTsGrp) { \ + (_ctx)->currGrpPair.sameTsGrp = true; \ + (_ctx)->currGrpPair.finishGrp = (_done); \ + SET_SAME_TS_GRP_HJOIN(&(_ctx)->currGrpPair, _octx); \ + (_ctx)->inSameTsGrp = false; \ + (_ctx)->pLastGrpPair = taosArrayPush((_octx)->pGrpResList, &(_ctx)->currGrpPair); \ + } \ } while (0) #define FIN_DIFF_TS_GRP(_ctx, _octx, _done) do { \ @@ -260,6 +298,8 @@ typedef struct SMJoinOperatorInfo { } \ } while (0) +#define CURRENT_BLK_GRP_ROWS(_in) (((_in)->grpRowNum + (_in)->grpRowBeginIdx) <= (_in)->grpBeginBlk->pBlk->info.rows ? (_in)->grpRowNum : ((_in)->grpBeginBlk->pBlk->info.rows - (_in)->grpRowBeginIdx)) + #ifdef __cplusplus } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index a220dce31d..65ea024e61 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -211,8 +211,6 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin default: break; } - - pInfo->joinFp = (pInfo->subType == JOIN_STYPE_ASOF || pInfo->subType == JOIN_STYPE_WIN) ? mJoinProcessWinJoin: mJoinProcessMergeJoin; pInfo->pBuild = &pInfo->tbs[buildIdx]; pInfo->pProbe = &pInfo->tbs[probeIdx]; @@ -285,9 +283,64 @@ static int32_t mJoinDoRetrieve(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTbCtx return code; } -static void mJoinInitJoinCtx(SMJoinOperatorInfo* pJoin) { - blkFetchedFp; +static int32_t mJoinInitMergeCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { + SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx; + SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx; + pProbeCtx->type = E_JOIN_TB_PROBE; + pBuildCtx->type = E_JOIN_TB_BUILD; + + pCtx->tsJoinCtx.pProbeCtx = pProbeCtx; + pCtx->tsJoinCtx.pBuildCtx = pBuildCtx; + + pCtx->joinPhase = E_JOIN_PHASE_RETRIEVE; + + pCtx->outputCtx.hashCan = pProbeCtx->pTbInfo->eqNum > 0; + pCtx->outputCtx.pGrpResList = taosArrayInit(MJOIN_DEFAULT_BLK_ROWS_NUM, sizeof(SGrpPairRes)); + if (NULL == pCtx->outputCtx.pGrpResList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (pJoin->pFPreFilter) { + pCtx->outputCtx.cartCtx.pResBlk = createOneDataBlock(pJoin->pRes); + blockDataEnsureCapacity(pCtx->outputCtx.cartCtx.pResBlk, MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM); + pCtx->outputCtx.cartCtx.resThreshold = MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM * 0.75; + } else { + pCtx->outputCtx.cartCtx.pResBlk = pJoin->pRes; + pCtx->outputCtx.cartCtx.resThreshold = pOperator->resultInfo.threshold; + } + + if (!pCtx->outputCtx.hashCan && NULL == pJoin->pFPreFilter) { + pCtx->outputCtx.cartCtx.appendRes = true; + } + + pCtx->outputCtx.cartCtx.firstColNum = pProbeCtx->pTbInfo->finNum; + pCtx->outputCtx.cartCtx.pFirstCols = pProbeCtx->pTbInfo->finCols; + pCtx->outputCtx.cartCtx.secondColNum = pBuildCtx->pTbInfo->finNum; + pCtx->outputCtx.cartCtx.pSecondCols = pBuildCtx->pTbInfo->finCols; + + pCtx->outputCtx.cartCtx.pCartRowIdx = taosArrayInit(MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM, sizeof(int32_t)); + if (NULL == pCtx->outputCtx.cartCtx.pCartRowIdx) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCtx->outputCtx.cartCtx.pCartGrps = taosArrayInit(MJOIN_DEFAULT_BUFF_BLK_ROWS_NUM, sizeof(SMJoinCartGrp)); + if (NULL == pCtx->outputCtx.cartCtx.pCartGrps) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t mJoinInitCtx(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { + pJoin->joinFps = &gMJoinFps[pJoin->joinType][pJoin->subType]; + + int32_t code = (*pJoin->joinFps->initJoinCtx)(pOperator, pJoin); + if (code) { + return code; + } + + return TSDB_CODE_SUCCESS; } static void mJoinSetDone(SOperatorInfo* pOperator) { @@ -512,6 +565,7 @@ static bool mLeftJoinSplitGrpImpl(SOperatorInfo* pOperator, SMJoinOperatorInfo* if (pCtx->pLastGrpPair && pCtx->pLastGrpPair->sameTsGrp && pCtx->probeTs[pProbeCtx->blkRowIdx] == pCtx->pLastGrpPair->probeIn.grpLastTs) { pCtx->pLastGrpPair->probeIn.grpRowNum++; + SET_SAME_TS_GRP_HJOIN(pCtx->pLastGrpPair, pOutCtx); continue; } for (; pBuildCtx->blkIdx < pBuildCtx->blkNum; mJoinBuildMoveToNextBlk(pJoin, pCtx, pBuildCtx, pProbeCtx)) { @@ -612,9 +666,10 @@ static bool mLeftJoinSplitGrp(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi } static void mLeftJoinCart(SMJoinCartCtx* pCtx) { - int32_t currRows = pCtx->pResBlk->info.rows; + int32_t currRows = pCtx->appendRes ? pCtx->pResBlk->info.rows : 0; + for (int32_t c = 0; c < pCtx->firstColNum; ++c) { - SMJoinColInfo* pFirstCol = pCtx->pFirstCols + c; + SMJoinColMap* pFirstCol = pCtx->pFirstCols + c; SColumnInfoData* pInCol = taosArrayGet(pCtx->pFirstBlk->pDataBlock, pFirstCol->srcSlot); SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pFirstCol->dstSlot); for (int32_t r = 0; r < pCtx->firstRowNum; ++r) { @@ -625,6 +680,15 @@ static void mLeftJoinCart(SMJoinCartCtx* pCtx) { } } } + + for (int32_t c = 0; c < pCtx->secondColNum; ++c) { + SMJoinColMap* pSecondCol = pCtx->pSecondCols + c; + SColumnInfoData* pInCol = taosArrayGet(pCtx->pSecondBlk->pDataBlock, pSecondCol->srcSlot); + SColumnInfoData* pOutCol = taosArrayGet(pCtx->pResBlk->pDataBlock, pSecondCol->dstSlot); + for (int32_t r = 0; r < pCtx->firstRowNum; ++r) { + colDataAssignNRows(pOutCol, currRows + r * pCtx->secondRowNum, pInCol, pCtx->secondRowIdx, pCtx->secondRowNum); + } + } } static bool mLeftJoinSameTsOutput(SMJoinOperatorInfo* pJoin, SGrpPairRes* pPair) { @@ -651,10 +715,42 @@ static bool mLeftJoinSameTsOutput(SMJoinOperatorInfo* pJoin, SGrpPairRes* pPair) } while (true); } +static bool mLeftJoinCartOutput(SMJoinOperatorInfo* pJoin, SMJoinOutputCtx* pCtx) { + bool contLoop = false; + SMJoinCartCtx* pCart = &pCtx->cartCtx; + int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList); + int32_t rowsLeft = pCart->pResBlk pCart->pResBlk->info.rows; + + for (; pCtx->grpReadIdx < grpNum; pCtx->grpReadIdx++) { + SGrpPairRes* pPair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx); + if (!pPair->finishGrp) { + ASSERTS(pCtx->grpReadIdx == grpNum - 1, "unfinished grp not the last"); + taosArrayRemoveBatch(pCtx->pGrpResList, 0, pCtx->grpReadIdx - 1, NULL); + pCtx->grpReadIdx = 0; + break; + } + + if (pPair->hashJoin) { + contLoop = mLeftJoinHashOutput(pJoin, pPair); + } else if (pCtx->cartCtx.appendRes) { + contLoop = mLeftJoinDirectOutput(pJoin, pPair); + } + + if (!contLoop) { + return false; + } + } + +} + static bool mLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { SMJoinOutputCtx* pCtx = &pJoin->ctx.mergeCtx.outputCtx; bool contLoop = false; int32_t grpNum = taosArrayGetSize(pCtx->pGrpResList); + + if (pCtx->cartCtx.appendRes) { + return mLeftJoinCartOutput(pJoin, pCtx); + } for (; pCtx->grpReadIdx < grpNum; pCtx->grpReadIdx++) { SGrpPairRes* pPair = taosArrayGet(pCtx->pGrpResList, pCtx->grpReadIdx); @@ -665,9 +761,9 @@ static bool mLeftJoinOutput(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) break; } - if (pPair->sameTsGrp) { - contLoop = mLeftJoinSameTsOutput(pJoin, pPair); - } else { + if (pPair->hashJoin) { + contLoop = mLeftJoinHashOutput(pJoin, pPair); + } else if (pCtx->cartCtx.appendRes) { contLoop = mLeftJoinDirectOutput(pJoin, pPair); } @@ -703,23 +799,6 @@ static SSDataBlock* mJoinDoLeftJoin(SOperatorInfo* pOperator, SMJoinOperatorInfo } -static SSDataBlock* mJoinHanleMergeJoin(SOperatorInfo* pOperator) { - SMJoinOperatorInfo* pJoin = pOperator->info; - SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; - SMJoinTableCtx* pProbeCtx = &pCtx->probeTbCtx; - SMJoinTableCtx* pBuildCtx = &pCtx->buildTbCtx; - int32_t code = TSDB_CODE_SUCCESS; - SSDataBlock* pBlock = NULL; - - while (true) { - pBlock = (*pJoin->joinFp)(pOperator, pJoin, pCtx, pBuildCtx, pProbeCtx); - if (pBlock && pBlock->info.rows > 0) { - return pBlock; - } - } -} - - SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; if (pOperator->status == OP_EXEC_DONE) { @@ -788,7 +867,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t int32_t numOfCols = 0; pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - initResultSizeInfo(&pOperator->resultInfo, 4096); + initResultSizeInfo(&pOperator->resultInfo, MJOIN_DEFAULT_BLK_ROWS_NUM); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -798,7 +877,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t mJoinSetBuildAndProbeTable(pInfo, pJoinNode); - mJoinInitJoinCtx(pInfo); + mJoinInitCtx(pOperator, pInfo); code = mJoinBuildResColMap(pInfo, pJoinNode); if (code) { @@ -810,6 +889,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } + if (pJoinNode->pFullOnCond != NULL) { + code = filterInitFromNode(pJoinNode->pFullOnCond, &pInfo->pFPreFilter, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + if (pJoinNode->pColOnCond != NULL) { code = filterInitFromNode(pJoinNode->pColOnCond, &pInfo->pPreFilter, 0); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/mergejoinoperator_old.c b/source/libs/executor/src/mergejoinoperator_old.c deleted file mode 100755 index 5e74edc47f..0000000000 --- a/source/libs/executor/src/mergejoinoperator_old.c +++ /dev/null @@ -1,848 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "executorInt.h" -#include "filter.h" -#include "function.h" -#include "operator.h" -#include "os.h" -#include "querynodes.h" -#include "querytask.h" -#include "tcompare.h" -#include "tdatablock.h" -#include "thash.h" -#include "tmsg.h" -#include "ttypes.h" - -typedef struct SMJoinRowCtx { - bool rowRemains; - int64_t ts; - SArray* leftRowLocations; - SArray* leftCreatedBlocks; - SArray* rightCreatedBlocks; - int32_t leftRowIdx; - int32_t rightRowIdx; - - bool rightUseBuildTable; - SArray* rightRowLocations; -} SMJoinRowCtx; - -typedef struct SMJoinOperatorInfo { - SSDataBlock* pRes; - int32_t joinType; - int32_t inputOrder; - bool downstreamInitDone[2]; - bool downstreamFetchDone[2]; - int16_t downstreamResBlkId[2]; - - SSDataBlock* pLeft; - int32_t leftPos; - SColumnInfo leftCol; - - SSDataBlock* pRight; - int32_t rightPos; - SColumnInfo rightCol; - SNode* pCondAfterMerge; - SNode* pColEqualOnConditions; - - SArray* leftEqOnCondCols; - char* leftEqOnCondKeyBuf; - int32_t leftEqOnCondKeyLen; - - SArray* rightEqOnCondCols; - char* rightEqOnCondKeyBuf; - int32_t rightEqOnCondKeyLen; - - SSHashObj* rightBuildTable; - SMJoinRowCtx rowCtx; - - int64_t resRows; -} SMJoinOperatorInfo; - -static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); -static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); -static void destroyMergeJoinOperator(void* param); -static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr); - -static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) { - SNode* pPrimKeyCond = pJoinNode->pPrimKeyCond; - if (nodeType(pPrimKeyCond) != QUERY_NODE_OPERATOR) { - qError("not support this in join operator, %s", idStr); - return; // do not handle this - } - - SOperatorNode* pNode = (SOperatorNode*)pPrimKeyCond; - SColumnNode* col1 = (SColumnNode*)pNode->pLeft; - SColumnNode* col2 = (SColumnNode*)pNode->pRight; - SColumnNode* leftTsCol = NULL; - SColumnNode* rightTsCol = NULL; - if (col1->dataBlockId == col2->dataBlockId) { - leftTsCol = col1; - rightTsCol = col2; - } else { - if (col1->dataBlockId == pInfo->downstreamResBlkId[0]) { - ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[1]); - leftTsCol = col1; - rightTsCol = col2; - } else { - ASSERT(col1->dataBlockId == pInfo->downstreamResBlkId[1]); - ASSERT(col2->dataBlockId == pInfo->downstreamResBlkId[0]); - leftTsCol = col2; - rightTsCol = col1; - } - } - setJoinColumnInfo(&pInfo->leftCol, leftTsCol); - setJoinColumnInfo(&pInfo->rightCol, rightTsCol); -} - -static void extractEqualOnCondColsFromOper(SMJoinOperatorInfo* pInfo, SOperatorNode* pOperNode, - SColumn* pLeft, SColumn* pRight) { - SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft; - SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight; - if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pInfo->downstreamResBlkId[0]) { - *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); - *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); - } else { - *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); - *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); - } -} - -static void extractEqualOnCondCols(SMJoinOperatorInfo* pInfo, SNode* pEqualOnCondNode, - SArray* leftTagEqCols, SArray* rightTagEqCols) { - SColumn left = {0}; - SColumn right = {0}; - if (nodeType(pEqualOnCondNode) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)pEqualOnCondNode)->condType == LOGIC_COND_TYPE_AND) { - SNode* pNode = NULL; - FOREACH(pNode, ((SLogicConditionNode*)pEqualOnCondNode)->pParameterList) { - SOperatorNode* pOperNode = (SOperatorNode*)pNode; - extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right); - taosArrayPush(leftTagEqCols, &left); - taosArrayPush(rightTagEqCols, &right); - } - return; - } - - if (nodeType(pEqualOnCondNode) == QUERY_NODE_OPERATOR) { - SOperatorNode* pOperNode = (SOperatorNode*)pEqualOnCondNode; - extractEqualOnCondColsFromOper(pInfo, pOperNode, &left, &right); - taosArrayPush(leftTagEqCols, &left); - taosArrayPush(rightTagEqCols, &right); - } -} - -static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { - int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i); - (*keyLen) += pCol->bytes; // actual data + null_flag - } - - int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; - (*keyLen) += nullFlagSize; - - if (*keyLen >= 0) { - - (*keyBuf) = taosMemoryCalloc(1, (*keyLen)); - if ((*keyBuf) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t rowIndex, void* pKey) { - SColumnDataAgg* pColAgg = NULL; - size_t numOfGroupCols = taosArrayGetSize(pCols); - char* isNull = (char*)pKey; - char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; - - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = (SColumn*) taosArrayGet(pCols, i); - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); - - // valid range check. todo: return error code. - if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) { - continue; - } - - if (pBlock->pBlockAgg != NULL) { - pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? - } - - if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { - isNull[i] = 1; - } else { - isNull[i] = 0; - char* val = colDataGetData(pColInfoData, rowIndex); - if (pCol->type == TSDB_DATA_TYPE_JSON) { - int32_t dataLen = getJsonValueLen(val); - memcpy(pStart, val, dataLen); - pStart += dataLen; - } else if (IS_VAR_DATA_TYPE(pCol->type)) { - varDataCopy(pStart, val); - pStart += varDataTLen(val); - } else { - memcpy(pStart, val, pCol->bytes); - pStart += pCol->bytes; - } - } - } - return (int32_t)(pStart - (char*)pKey); -} - -SOperatorInfo** buildMergeJoinDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) { - SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES); - if (p) { - p[0] = pDownstream[0]; - p[1] = pDownstream[0]; - pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(p[0], 0); - pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(p[1], 1); - } - - return p; -} - - -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, - SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { - SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - bool newDownstreams = false; - - int32_t code = TSDB_CODE_SUCCESS; - if (pOperator == NULL || pInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - if (1 == numOfDownstream) { - newDownstreams = true; - pDownstream = buildMergeJoinDownstreams(pInfo, pDownstream); - if (NULL == pDownstream) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - numOfDownstream = 2; - } else { - pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0); - pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 0); - } - - int32_t numOfCols = 0; - pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - - SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); - initResultSizeInfo(&pOperator->resultInfo, 4096); - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - - setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - - extractTimeCondition(pInfo, pJoinNode, GET_TASKID(pTaskInfo)); - - if (pJoinNode->pFullOnCond != NULL && pJoinNode->node.pConditions != NULL) { - pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); - if (pInfo->pCondAfterMerge == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge); - pLogicCond->pParameterList = nodesMakeList(); - if (pLogicCond->pParameterList == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pFullOnCond)); - nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); - pLogicCond->condType = LOGIC_COND_TYPE_AND; - } else if (pJoinNode->pFullOnCond != NULL) { - pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pFullOnCond); - } else if (pJoinNode->pColEqCond != NULL) { - pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pColEqCond); - } else if (pJoinNode->node.pConditions != NULL) { - pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions); - } else { - pInfo->pCondAfterMerge = NULL; - } - - code = filterInitFromNode(pInfo->pCondAfterMerge, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - pInfo->inputOrder = TSDB_ORDER_ASC; - if (pJoinNode->node.inputTsOrder == ORDER_ASC) { - pInfo->inputOrder = TSDB_ORDER_ASC; - } else if (pJoinNode->node.inputTsOrder == ORDER_DESC) { - pInfo->inputOrder = TSDB_ORDER_DESC; - } - - pInfo->pColEqualOnConditions = pJoinNode->pColEqCond; - if (pInfo->pColEqualOnConditions != NULL) { - pInfo->leftEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); - pInfo->rightEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); - extractEqualOnCondCols(pInfo, pInfo->pColEqualOnConditions, pInfo->leftEqOnCondCols, pInfo->rightEqOnCondCols); - initTagColskeyBuf(&pInfo->leftEqOnCondKeyLen, &pInfo->leftEqOnCondKeyBuf, pInfo->leftEqOnCondCols); - initTagColskeyBuf(&pInfo->rightEqOnCondKeyLen, &pInfo->rightEqOnCondKeyBuf, pInfo->rightEqOnCondCols); - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pInfo->rightBuildTable = tSimpleHashInit(256, hashFn); - } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); - - code = appendDownstream(pOperator, pDownstream, numOfDownstream); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - if (newDownstreams) { - taosMemoryFree(pDownstream); - } - - pOperator->numOfRealDownstream = newDownstreams ? 1 : 2; - - return pOperator; - -_error: - if (pInfo != NULL) { - destroyMergeJoinOperator(pInfo); - } - if (newDownstreams) { - taosMemoryFree(pDownstream); - } - - taosMemoryFree(pOperator); - pTaskInfo->code = code; - return NULL; -} - -void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { - pColumn->slotId = pColumnNode->slotId; - pColumn->type = pColumnNode->node.resType.type; - pColumn->bytes = pColumnNode->node.resType.bytes; - pColumn->precision = pColumnNode->node.resType.precision; - pColumn->scale = pColumnNode->node.resType.scale; -} - -static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { - void* p = NULL; - int32_t iter = 0; - - while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) { - SArray* rows = (*(SArray**)p); - taosArrayDestroy(rows); - } - - tSimpleHashCleanup(pBuildTable); -} - -void destroyMergeJoinOperator(void* param) { - SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param; - if (pJoinOperator->pColEqualOnConditions != NULL) { - mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); - taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf); - taosArrayDestroy(pJoinOperator->rightEqOnCondCols); - - taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf); - taosArrayDestroy(pJoinOperator->leftEqOnCondCols); - } - nodesDestroyNode(pJoinOperator->pCondAfterMerge); - - taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks); - taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks); - taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations); - taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations); - - pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); - taosMemoryFreeClear(param); -} - -static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow, - SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, - int32_t rightPos) { - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); - - SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i]; - - int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; - int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; - int32_t rowIndex = -1; - - SColumnInfoData* pSrc = NULL; - if (pLeftBlock->info.id.blockId == blockId) { - pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId); - rowIndex = leftPos; - } else { - pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId); - rowIndex = rightPos; - } - - if (colDataIsNull_s(pSrc, rowIndex)) { - colDataSetNULL(pDst, currRow); - } else { - char* p = colDataGetData(pSrc, rowIndex); - colDataSetVal(pDst, currRow, p, false); - } - } -} -typedef struct SRowLocation { - SSDataBlock* pDataBlock; - int32_t pos; -} SRowLocation; - -// pBlock[tsSlotId][startPos, endPos) == timestamp, -static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp, - int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) { - int32_t numRows = pBlock->info.rows; - ASSERT(startPos < numRows); - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId); - - int32_t i = startPos; - for (; i < numRows; ++i) { - char* pNextVal = colDataGetData(pCol, i); - if (timestamp != *(int64_t*)pNextVal) { - break; - } - } - int32_t endPos = i; - *pEndPos = endPos; - - if (endPos - startPos == 0) { - return 0; - } - - SSDataBlock* block = pBlock; - bool createdNewBlock = false; - if (endPos == numRows) { - block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); - taosArrayPush(createdBlocks, &block); - createdNewBlock = true; - } - SRowLocation location = {0}; - for (int32_t j = startPos; j < endPos; ++j) { - location.pDataBlock = block; - location.pos = (createdNewBlock ? j - startPos : j); - taosArrayPush(rowLocations, &location); - } - return 0; -} - -// whichChild == 0, left child of join; whichChild ==1, right child of join -static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId, - SSDataBlock* startDataBlock, int32_t startPos, - int64_t timestamp, SArray* rowLocations, - SArray* createdBlocks) { - ASSERT(whichChild == 0 || whichChild == 1); - - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - int32_t endPos = -1; - SSDataBlock* dataBlock = startDataBlock; - mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks); - while (endPos == dataBlock->info.rows) { - SOperatorInfo* ds = pOperator->pDownstream[whichChild]; - dataBlock = getNextBlockFromDownstreamRemain(pOperator, whichChild); - qError("merge join %s got block for same ts, rows:%" PRId64, whichChild == 0 ? "left" : "right", dataBlock ? dataBlock->info.rows : 0); - if (whichChild == 0) { - pJoinInfo->leftPos = 0; - pJoinInfo->pLeft = dataBlock; - } else if (whichChild == 1) { - pJoinInfo->rightPos = 0; - pJoinInfo->pRight = dataBlock; - } - - if (dataBlock == NULL) { - pJoinInfo->downstreamFetchDone[whichChild] = true; - endPos = -1; - break; - } - - mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks); - } - if (endPos != -1) { - if (whichChild == 0) { - pJoinInfo->leftPos = endPos; - } else if (whichChild == 1) { - pJoinInfo->rightPos = endPos; - } - } - return 0; -} - -static int32_t mergeJoinFillBuildTable(SMJoinOperatorInfo* pInfo, SArray* rightRowLocations) { - for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { - SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); - int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightEqOnCondCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightEqOnCondKeyBuf); - SArray** ppRows = tSimpleHashGet(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen); - if (!ppRows) { - SArray* rows = taosArrayInit(4, sizeof(SRowLocation)); - taosArrayPush(rows, rightRow); - tSimpleHashPut(pInfo->rightBuildTable, pInfo->rightEqOnCondKeyBuf, keyLen, &rows, POINTER_BYTES); - } else { - taosArrayPush(*ppRows, rightRow); - } - } - return TSDB_CODE_SUCCESS; -} - -static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows, - const SArray* leftRowLocations, int32_t leftRowIdx, - int32_t rightRowIdx, bool useBuildTableTSRange, SArray* rightRowLocations, bool* pReachThreshold) { - *pReachThreshold = false; - uint32_t limitRowNum = pOperator->resultInfo.threshold; - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - size_t leftNumJoin = taosArrayGetSize(leftRowLocations); - - int32_t i,j; - - for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { - SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); - SArray* pRightRows = NULL; - if (useBuildTableTSRange) { - int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftEqOnCondCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftEqOnCondKeyBuf); - SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftEqOnCondKeyBuf, keyLen); - if (!ppRightRows) { - continue; - } - pRightRows = *ppRightRows; - } else { - pRightRows = rightRowLocations; - } - size_t rightRowsSize = taosArrayGetSize(pRightRows); - for (j = rightRowIdx; j < rightRowsSize; ++j) { - if (*nRows >= limitRowNum) { - *pReachThreshold = true; - break; - } - - SRowLocation* rightRow = taosArrayGet(pRightRows, j); - mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, - rightRow->pos); - ++*nRows; - } - if (*pReachThreshold) { - break; - } - } - - if (*pReachThreshold) { - pJoinInfo->rowCtx.rowRemains = true; - pJoinInfo->rowCtx.leftRowIdx = i; - pJoinInfo->rowCtx.rightRowIdx = j; - } - return TSDB_CODE_SUCCESS; -} - -static void mergeJoinDestroyTSRangeCtx(SMJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, - SArray* rightCreatedBlocks, bool rightUseBuildTable, SArray* rightRowLocations) { - for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); - blockDataDestroy(pBlock); - } - taosArrayDestroy(rightCreatedBlocks); - for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); - blockDataDestroy(pBlock); - } - if (rightRowLocations != NULL) { - taosArrayDestroy(rightRowLocations); - } - if (rightUseBuildTable) { - void* p = NULL; - int32_t iter = 0; - while ((p = tSimpleHashIterate(pJoinInfo->rightBuildTable, p, &iter)) != NULL) { - SArray* rows = (*(SArray**)p); - taosArrayDestroy(rows); - } - tSimpleHashClear(pJoinInfo->rightBuildTable); - } - - taosArrayDestroy(leftCreatedBlocks); - taosArrayDestroy(leftRowLocations); - - pJoinInfo->rowCtx.rowRemains = false; - pJoinInfo->rowCtx.leftRowLocations = NULL; - pJoinInfo->rowCtx.leftCreatedBlocks = NULL; - pJoinInfo->rowCtx.rightCreatedBlocks = NULL; - pJoinInfo->rowCtx.rightUseBuildTable = false; - pJoinInfo->rowCtx.rightRowLocations = NULL; -} - -static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, - int32_t* nRows) { - int32_t code = TSDB_CODE_SUCCESS; - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - SArray* leftRowLocations = NULL; - SArray* rightRowLocations = NULL; - SArray* leftCreatedBlocks = NULL; - SArray* rightCreatedBlocks = NULL; - int32_t leftRowIdx = 0; - int32_t rightRowIdx = 0; - SSHashObj* rightTableHash = NULL; - bool rightUseBuildTable = false; - - if (pJoinInfo->rowCtx.rowRemains) { - leftRowLocations = pJoinInfo->rowCtx.leftRowLocations; - leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks; - rightUseBuildTable = pJoinInfo->rowCtx.rightUseBuildTable; - rightRowLocations = pJoinInfo->rowCtx.rightRowLocations; - rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks; - leftRowIdx = pJoinInfo->rowCtx.leftRowIdx; - rightRowIdx = pJoinInfo->rowCtx.rightRowIdx; - } else { - leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); - leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); - - rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); - rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); - - mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, - pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); - mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, - pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); - if (pJoinInfo->pColEqualOnConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { - mergeJoinFillBuildTable(pJoinInfo, rightRowLocations); - rightUseBuildTable = true; - taosArrayDestroy(rightRowLocations); - rightRowLocations = NULL; - } - } - - size_t leftNumJoin = taosArrayGetSize(leftRowLocations); - code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.threshold); - if (code != TSDB_CODE_SUCCESS) { - qError("%s can not ensure block capacity for join. left: %zu", GET_TASKID(pOperator->pTaskInfo), - leftNumJoin); - } - - bool reachThreshold = false; - - if (code == TSDB_CODE_SUCCESS) { - mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, - rightRowIdx, rightUseBuildTable, rightRowLocations, &reachThreshold); - } - - if (!reachThreshold) { - mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, - rightUseBuildTable, rightRowLocations); - - } else { - pJoinInfo->rowCtx.rowRemains = true; - pJoinInfo->rowCtx.ts = timestamp; - pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; - pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; - pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; - pJoinInfo->rowCtx.rightUseBuildTable = rightUseBuildTable; - pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; - } - return TSDB_CODE_SUCCESS; -} - -static void setMergeJoinDone(SOperatorInfo* pOperator) { - setOperatorCompleted(pOperator); - if (pOperator->pDownstreamGetParams) { - freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM); - freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM); - pOperator->pDownstreamGetParams[0] = NULL; - pOperator->pDownstreamGetParams[1] = NULL; - } -} - -static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - bool leftEmpty = false; - - if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { - if (!pJoinInfo->downstreamFetchDone[0]) { - pJoinInfo->pLeft = getNextBlockFromDownstreamRemain(pOperator, 0); - pJoinInfo->downstreamInitDone[0] = true; - - pJoinInfo->leftPos = 0; - qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0); - } else { - pJoinInfo->pLeft = NULL; - } - - if (pJoinInfo->pLeft == NULL) { - if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initDownstream && !pJoinInfo->downstreamInitDone[1]) { - leftEmpty = true; - } else { - setMergeJoinDone(pOperator); - return false; - } - } - } - - if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { - if (!pJoinInfo->downstreamFetchDone[1]) { - pJoinInfo->pRight = getNextBlockFromDownstreamRemain(pOperator, 1); - pJoinInfo->downstreamInitDone[1] = true; - - pJoinInfo->rightPos = 0; - qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0); - } else { - pJoinInfo->pRight = NULL; - } - - if (pJoinInfo->pRight == NULL) { - setMergeJoinDone(pOperator); - return false; - } else { - if (leftEmpty) { - setMergeJoinDone(pOperator); - return false; - } - } - } - - if (NULL == pJoinInfo->pLeft || NULL == pJoinInfo->pRight) { - setMergeJoinDone(pOperator); - return false; - } - - // only the timestamp match support for ordinary table - SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); - char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos); - *pLeftTs = *(int64_t*)pLeftVal; - - SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId); - char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); - *pRightTs = *(int64_t*)pRightVal; - - return true; -} - -static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - - int32_t nrows = pRes->info.rows; - - bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false; - - while (1) { - int64_t leftTs = 0; - int64_t rightTs = 0; - if (pJoinInfo->rowCtx.rowRemains) { - leftTs = pJoinInfo->rowCtx.ts; - rightTs = pJoinInfo->rowCtx.ts; - } else { - bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); - if (!hasNextTs) { - break; - } - } - - if (leftTs == rightTs) { - mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows); - } else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) { - pJoinInfo->leftPos += 1; - - if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { - continue; - } - } else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) { - pJoinInfo->rightPos += 1; - if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { - continue; - } - } - - // the pDataBlock are always the same one, no need to call this again - pRes->info.rows = nrows; - pRes->info.dataLoad = 1; - pRes->info.scanFlag = MAIN_SCAN; - if (pRes->info.rows >= pOperator->resultInfo.threshold) { - break; - } - } -} - -void resetMergeJoinOperator(struct SOperatorInfo* pOperator) { - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - if (pJoinInfo->rowCtx.rowRemains) { - mergeJoinDestroyTSRangeCtx(pJoinInfo, pJoinInfo->rowCtx.leftRowLocations, pJoinInfo->rowCtx.leftCreatedBlocks, pJoinInfo->rowCtx.rightCreatedBlocks, - pJoinInfo->rowCtx.rightUseBuildTable, pJoinInfo->rowCtx.rightRowLocations); - } - pJoinInfo->pLeft = NULL; - pJoinInfo->leftPos = 0; - pJoinInfo->pRight = NULL; - pJoinInfo->rightPos = 0; - pJoinInfo->downstreamFetchDone[0] = false; - pJoinInfo->downstreamFetchDone[1] = false; - pJoinInfo->downstreamInitDone[0] = false; - pJoinInfo->downstreamInitDone[1] = false; - pJoinInfo->resRows = 0; - pOperator->status = OP_OPENED; -} - -SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { - SMJoinOperatorInfo* pJoinInfo = pOperator->info; - if (pOperator->status == OP_EXEC_DONE) { - if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) { - qError("total merge join res rows:%" PRId64, pJoinInfo->resRows); - return NULL; - } else { - resetMergeJoinOperator(pOperator); - qError("start new merge join"); - } - } - - int64_t st = 0; - if (pOperator->cost.openCost == 0) { - st = taosGetTimestampUs(); - } - - SSDataBlock* pRes = pJoinInfo->pRes; - blockDataCleanup(pRes); - - while (true) { - int32_t numOfRowsBefore = pRes->info.rows; - doMergeJoinImpl(pOperator, pRes); - int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore; - if (numOfNewRows == 0) { - break; - } - if (pOperator->exprSupp.pFilterInfo != NULL) { - doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); - } - if (pRes->info.rows >= pOperator->resultInfo.threshold) { - break; - } - if (pOperator->status == OP_EXEC_DONE) { - break; - } - } - - if (pOperator->cost.openCost == 0) { - pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; - } - - if (pRes->info.rows > 0) { - pJoinInfo->resRows += pRes->info.rows; - qError("merge join returns res rows:%" PRId64, pRes->info.rows); - return pRes; - } else { - qError("total merge join res rows:%" PRId64, pJoinInfo->resRows); - return NULL; - } -} - - -