diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 99ef9c9548..e9e4ed403d 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -190,12 +190,14 @@ int32_t getJsonValueLen(const char* data); int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData); int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue); +void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows); int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows, bool isNull); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo); +int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 3150b7acce..65e7e1f165 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -484,6 +484,7 @@ typedef struct SSortMergeJoinPhysiNode { SNode* pColOnCond; SNode* pFullOnCond; SNodeList* pTargets; + SQueryStat inputStat[2]; } SSortMergeJoinPhysiNode; typedef struct SHashJoinPhysiNode { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0a13e2ba38..a3fc95f4db 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -540,7 +540,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI } else { if (pSrc->hasNull) { if (0 == BitPos(dstIdx) && 0 == BitPos(srcIdx)) { - memcpy(BMCharPos(pDst->nullbitmap, dstIdx), BMCharPos(pSrc->nullbitmap, srcIdx), BitmapLen(numOfRows)); + memcpy(&BMCharPos(pDst->nullbitmap, dstIdx), &BMCharPos(pSrc->nullbitmap, srcIdx), BitmapLen(numOfRows)); if (!pDst->hasNull) { int32_t nullBytes = BitmapLen(numOfRows); int32_t startPos = CharPos(dstIdx); @@ -554,7 +554,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI } else if (BitPos(dstIdx) == BitPos(srcIdx)) { for (int32_t i = 0; i < numOfRows; ++i) { if (0 == BitPos(dstIdx)) { - memcpy(BMCharPos(pDst->nullbitmap, dstIdx + i), BMCharPos(pSrc->nullbitmap, srcIdx + i), BitmapLen(numOfRows - i)); + memcpy(&BMCharPos(pDst->nullbitmap, dstIdx + i), &BMCharPos(pSrc->nullbitmap, srcIdx + i), BitmapLen(numOfRows - i)); if (!pDst->hasNull) { int32_t nullBytes = BitmapLen(numOfRows - i); int32_t startPos = CharPos(dstIdx + i); @@ -586,7 +586,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI } } } else { - memset(BMCharPos(pDst->nullbitmap, dstIdx), 0, BitmapLen(numOfRows)); + memset(&BMCharPos(pDst->nullbitmap, dstIdx), 0, BitmapLen(numOfRows)); } if (pSrc->pData != NULL) { diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index af61f11f35..91706bd87a 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -23,6 +23,8 @@ extern "C" { #define MJOIN_HJOIN_CART_THRESHOLD 16 #define MJOIN_BLK_SIZE_LIMIT 10485760 +struct SMJoinOperatorInfo; + typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*); typedef enum EJoinTableType { @@ -55,7 +57,7 @@ typedef struct SMJoinColInfo { } SMJoinColInfo; -typedef struct SMJoinTableInfo { +typedef struct SMJoinTableCtx { EJoinTableType type; int32_t downStreamIdx; SOperatorInfo* downStream; @@ -87,19 +89,22 @@ typedef struct SMJoinTableInfo { SSDataBlock* blk; int32_t blkRowIdx; + // merge join + int64_t grpTotalRows; int32_t grpIdx; SArray* eqGrps; SArray* createdBlks; + // hash join + int32_t grpArrayIdx; SArray* pGrpArrays; - // hash join int32_t grpRowIdx; SArray* pHashCurGrp; SSHashObj* pGrpHash; -} SMJoinTableInfo; +} SMJoinTableCtx; typedef struct SMJoinGrpRows { SSDataBlock* blk; @@ -110,6 +115,7 @@ typedef struct SMJoinGrpRows { } SMJoinGrpRows; typedef struct SMJoinMergeCtx { + struct SMJoinOperatorInfo* pJoin; bool hashCan; bool keepOrder; bool grpRemains; @@ -121,7 +127,6 @@ typedef struct SMJoinMergeCtx { int64_t lastEqTs; SMJoinGrpRows probeNEqGrp; bool hashJoin; - SMJoinOperatorInfo* pJoin; } SMJoinMergeCtx; typedef struct SMJoinWinCtx { @@ -161,11 +166,9 @@ typedef struct SMJoinOperatorInfo { int32_t subType; int32_t inputTsOrder; int32_t errCode; - SMJoinTableInfo tbs[2]; - SMJoinTableInfo* build; - SMJoinTableInfo* probe; - int32_t pResColNum; - int8_t* pResColMap; + SMJoinTableCtx tbs[2]; + SMJoinTableCtx* build; + SMJoinTableCtx* probe; SFilterInfo* pFPreFilter; SFilterInfo* pPreFilter; SFilterInfo* pFinFilter; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 221e923214..e4019bb742 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -32,8 +32,6 @@ SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** if (p) { p[0] = pDownstream[0]; p[1] = pDownstream[0]; - pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(p[0], 0); - pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(p[1], 1); } return p; @@ -47,15 +45,12 @@ int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDown return TSDB_CODE_OUT_OF_MEMORY; } *numOfDownstream = 2; - } else { - pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0); - pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 0); } return TSDB_CODE_SUCCESS; } -static int32_t mJoinInitPrimKeyInfo(SMJoinTableInfo* pTable, int32_t slotId) { +static int32_t mJoinInitPrimKeyInfo(SMJoinTableCtx* pTable, int32_t slotId) { pTable->primCol = taosMemoryMalloc(sizeof(SMJoinColInfo)); if (NULL == pTable->primCol) { return TSDB_CODE_OUT_OF_MEMORY; @@ -66,71 +61,7 @@ static int32_t mJoinInitPrimKeyInfo(SMJoinTableInfo* pTable, int32_t slotId) { return TSDB_CODE_SUCCESS; } -static void mJoinGetValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { - *colNum = 0; - - SNode* pNode = NULL; - FOREACH(pNode, pList) { - STargetNode* pTarget = (STargetNode*)pNode; - SColumnNode* pCol = (SColumnNode*)pTarget->pExpr; - if (pCol->dataBlockId == blkId) { - (*colNum)++; - } - } -} - -static int32_t mJoinInitValColsInfo(SMJoinTableInfo* pTable, SNodeList* pList) { - mJoinGetValColNum(pList, pTable->blkId, &pTable->valNum); - if (pTable->valNum == 0) { - return TSDB_CODE_SUCCESS; - } - - pTable->valCols = taosMemoryMalloc(pTable->valNum * sizeof(SMJoinColInfo)); - if (NULL == pTable->valCols) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t i = 0; - int32_t colNum = 0; - SNode* pNode = NULL; - FOREACH(pNode, pList) { - STargetNode* pTarget = (STargetNode*)pNode; - SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr; - if (pColNode->dataBlockId == pTable->blkId) { - if (valColInKeyCols(pColNode->slotId, pTable->keyNum, pTable->keyCols, &pTable->valCols[i].srcSlot)) { - pTable->valCols[i].keyCol = true; - } else { - pTable->valCols[i].keyCol = false; - pTable->valCols[i].srcSlot = pColNode->slotId; - pTable->valColExist = true; - colNum++; - } - pTable->valCols[i].dstSlot = pTarget->slotId; - pTable->valCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); - if (pTable->valCols[i].vardata) { - if (NULL == pTable->valVarCols) { - pTable->valVarCols = taosArrayInit(pTable->valNum, sizeof(int32_t)); - if (NULL == pTable->valVarCols) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } - taosArrayPush(pTable->valVarCols, &i); - } - pTable->valCols[i].bytes = pColNode->node.resType.bytes; - if (!pTable->valCols[i].keyCol && !pTable->valCols[i].vardata) { - pTable->valBufSize += pColNode->node.resType.bytes; - } - i++; - } - } - - pTable->valBitMapSize = BitmapLen(colNum); - pTable->valBufSize += pTable->valBitMapSize; - - return TSDB_CODE_SUCCESS; -} - -static int32_t mJoinInitKeyColsInfo(SMJoinTableInfo* pTable, SNodeList* pList) { +static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) { pTable->keyNum = LIST_LENGTH(pList); pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SMJoinColInfo)); @@ -161,7 +92,7 @@ static int32_t mJoinInitKeyColsInfo(SMJoinTableInfo* pTable, SNodeList* pList) { } static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { - SMJoinTableInfo* pTable = &pJoin->tbs[idx]; + SMJoinTableCtx* pTable = &pJoin->tbs[idx]; pTable->downStream = pDownstream[idx]; pTable->blkId = pDownstream[idx]->resultDataBlockId; int32_t code = mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId); @@ -172,11 +103,12 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi if (code) { return code; } +/* code = mJoinInitValColsInfo(pTable, pJoinNode->pTargets); if (code) { return code; } - +*/ memcpy(&pTable->inputStat, pStat, sizeof(*pStat)); pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows)); @@ -232,28 +164,6 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin pInfo->probe->type = E_JOIN_TB_PROBE; } -static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) { - pInfo->pResColNum = pJoinNode->pTargets->length; - pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t)); - if (NULL == pInfo->pResColMap) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - SNode* pNode = NULL; - int32_t i = 0; - FOREACH(pNode, pJoinNode->pTargets) { - STargetNode* pTarget = (STargetNode*)pNode; - SColumnNode* pCol = (SColumnNode*)pTarget->pExpr; - if (pCol->dataBlockId == pInfo->build->blkId) { - pInfo->pResColMap[i] = 1; - } - - i++; - } - - return TSDB_CODE_SUCCESS; -} - static int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; @@ -261,7 +171,7 @@ static int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiN pCtx->hashCan = pJoin->probe->keyNum > 0; pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc.totalRowSize)); + blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize)); if (pJoin->pFPreFilter) { pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false); @@ -297,14 +207,14 @@ static void mJoinSetDone(SOperatorInfo* pOperator) { } static int32_t mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp) { - SMJoinTableInfo* probe = pJoin->probe; - SMJoinTableInfo* build = pJoin->build; + SMJoinTableCtx* probe = pJoin->probe; + SMJoinTableCtx* build = pJoin->build; int32_t currRows = append ? pRes->info.rows : 0; int32_t firstRows = GRP_REMAIN_ROWS(pGrp); for (int32_t c = 0; c < probe->finNum; ++c) { SMJoinColMap* pFirstCol = probe->finCols + c; - SColumnInfoData* pInCol = taosArrayGet(pGrp->blk, pFirstCol->srcSlot); + SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot); SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows); } @@ -320,15 +230,15 @@ static int32_t mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRe } static int32_t mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) { - SMJoinTableInfo* probe = pJoin->probe; - SMJoinTableInfo* build = pJoin->build; + SMJoinTableCtx* probe = pJoin->probe; + SMJoinTableCtx* build = pJoin->build; int32_t currRows = append ? pRes->info.rows : 0; int32_t firstRows = GRP_REMAIN_ROWS(pFirst); int32_t secondRows = GRP_REMAIN_ROWS(pSecond); for (int32_t c = 0; c < probe->finNum; ++c) { SMJoinColMap* pFirstCol = probe->finCols + c; - SColumnInfoData* pInCol = taosArrayGet(pFirst->blk, pFirstCol->srcSlot); + SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot); SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); for (int32_t r = 0; r < firstRows; ++r) { if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) { @@ -341,7 +251,7 @@ static int32_t mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, for (int32_t c = 0; c < build->finNum; ++c) { SMJoinColMap* pSecondCol = build->finCols + c; - SColumnInfoData* pInCol = taosArrayGet(pSecond->blk, pSecondCol->srcSlot); + SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot); SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot); for (int32_t r = 0; r < firstRows; ++r) { colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows); @@ -355,8 +265,8 @@ static int32_t mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) { int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; - SMJoinTableInfo* probe = pCtx->pJoin->probe; - SMJoinTableInfo* build = pCtx->pJoin->build; + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); @@ -407,7 +317,7 @@ static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) { static int32_t mLeftJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) { SSDataBlock* pLess = NULL; SSDataBlock* pMore = NULL; - if ((*ppMid)->info.rows < ppFin->info.rows) { + if ((*ppMid)->info.rows < (*ppFin)->info.rows) { pLess = (*ppMid); pMore = (*ppFin); } else { @@ -418,7 +328,7 @@ static int32_t mLeftJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMi int32_t totalRows = pMore->info.rows + pLess->info.rows; if (totalRows <= pMore->info.capacity) { MJ_ERR_RET(blockDataMerge(pMore, pLess)); - tDataBlkReset(pLess); + blockDataReset(pLess); pCtx->midRemains = false; } else { int32_t copyRows = pMore->info.capacity - pMore->info.rows; @@ -434,8 +344,8 @@ static int32_t mLeftJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMi } static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { - SMJoinTableInfo* probe = pCtx->pJoin->probe; - SMJoinTableInfo* build = pCtx->pJoin->build; + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); int32_t probeEndIdx = probeGrp->endIdx; @@ -498,7 +408,7 @@ static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { } } - if (GRP_DONE(probeGrp->readIdx) || BLK_IS_FULL(pCtx->finBlk)) { + if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) { break; } @@ -541,20 +451,20 @@ static int32_t mLeftJoinNonEqCart(SMJoinMergeCtx* pCtx) { -static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableInfo* ppTb) { - if ((*ppTb)->dsFetchDone) { +static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) { + if (pTb->dsFetchDone) { return (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) ? false : true; } if (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) { - (*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, (*ppTb)->downStreamIdx); - (*ppTb)->dsInitDone = true; + (*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTb->downStreamIdx); + pTb->dsInitDone = true; - qDebug("%s merge join %s table got %" PRId64 " rows block", GET_TASKID(pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(ppTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0); + qDebug("%s merge join %s table got %" PRId64 " rows block", GET_TASKID(pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0); *pIdx = 0; if (NULL == (*ppBlk)) { - (*ppTb)->dsFetchDone = true; + pTb->dsFetchDone = true; } return ((*ppBlk) == NULL) ? false : true; @@ -565,12 +475,12 @@ static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBl static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { - bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, &pJoin->probe); + bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; do { if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { - buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, &pJoin->build); + buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); } if (!probeGot) { @@ -592,50 +502,22 @@ static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi return true; } -static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp, - int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) { - int32_t numRows = pBlock->info.rows; - ASSERT(startPos < numRows); - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId); - - int32_t i = startPos; - for (; i < numRows; ++i) { - char* pNextVal = colDataGetData(pCol, i); - if (timestamp != *(int64_t*)pNextVal) { - break; - } +static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) { + int32_t blkNum = taosArrayGetSize(pCreatedBlks); + for (int32_t i = 0; i < blkNum; ++i) { + blockDataDestroy(*(SSDataBlock**)TARRAY_GET_ELEM(pCreatedBlks, i)); } - int32_t endPos = i; - *pEndPos = endPos; - - if (endPos - startPos == 0) { - return 0; - } - - SSDataBlock* block = pBlock; - bool createdNewBlock = false; - if (endPos == numRows) { - block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); - taosArrayPush(createdBlocks, &block); - createdNewBlock = true; - } - SRowLocation location = {0}; - for (int32_t j = startPos; j < endPos; ++j) { - location.pDataBlock = block; - location.pos = (createdNewBlock ? j - startPos : j); - taosArrayPush(rowLocations, &location); - } - return 0; + taosArrayClear(pCreatedBlks); } - -static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp, bool* wholeBlk, bool restart) { +static void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) { SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); SMJoinGrpRows* pGrp = NULL; if (restart) { pTable->grpTotalRows = 0; pTable->grpIdx = 0; + mJoinDestroyCreatedBlks(pTable->createdBlks); pGrp = taosArrayGet(pTable->eqGrps, 0); } else { pGrp = taosArrayReserve(pTable->eqGrps, 1); @@ -675,7 +557,7 @@ static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable } -static int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp) { +static int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp) { bool wholeBlk = false; mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true); @@ -698,7 +580,7 @@ static int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableInfo* return TSDB_CODE_SUCCESS; } -static int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableInfo* pTable) { +static int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable) { for (int32_t i = 0; i < pTable->keyNum; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { @@ -719,7 +601,7 @@ static int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableInfo* pTable) return TSDB_CODE_SUCCESS; } -static FORCE_INLINE true mJoinCopyKeyColsDataToBuf(SMJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { +static FORCE_INLINE bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen) { char *pData = NULL; size_t bufLen = 0; @@ -760,7 +642,7 @@ static FORCE_INLINE true mJoinCopyKeyColsDataToBuf(SMJoinTableInfo* pTable, int3 return false; } -static int32_t mJoinGetAvailableGrpArray(SMJoinTableInfo* pTable, SArray** ppRes) { +static int32_t mJoinGetAvailableGrpArray(SMJoinTableCtx* pTable, SArray** ppRes) { do { if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) { *ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++); @@ -779,7 +661,7 @@ static int32_t mJoinGetAvailableGrpArray(SMJoinTableInfo* pTable, SArray** ppRes } static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) { - SMJoinTableInfo* pBuild = pJoin->build; + SMJoinTableCtx* pBuild = pJoin->build; SMJoinRowPos pos = {pBlock, rowIdx}; SArray** pGrpRows = tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen); if (!pGrpRows) { @@ -796,7 +678,7 @@ static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDat } -static int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableInfo* pTable) { +static int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) { size_t bufLen = 0; tSimpleHashClear(pJoin->build->pGrpHash); @@ -820,7 +702,7 @@ static int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableInfo* return TSDB_CODE_SUCCESS; } -static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableInfo* probe, SMJoinTableInfo* build) { +static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build) { int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity; if (rowsLeft <= 0) { return false; @@ -837,7 +719,7 @@ static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, boo for (int32_t c = 0; c < probe->finNum; ++c) { SMJoinColMap* pFirstCol = probe->finCols + c; - SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk, pFirstCol->srcSlot); + SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot); SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot); if (colDataIsNull_s(pInCol, probeGrp->readIdx)) { colDataSetNItemsNull(pOutCol, currRows, actRows); @@ -851,7 +733,7 @@ static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, boo SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot); for (int32_t r = 0; r < actRows; ++r) { SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, r); - SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk, pSecondCol->srcSlot); + SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot); colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1); } } @@ -872,8 +754,8 @@ static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, boo } static int32_t mLeftJoinHashFullCart(SMJoinMergeCtx* pCtx) { - SMJoinTableInfo* probe = pCtx->pJoin->probe; - SMJoinTableInfo* build = pCtx->pJoin->build; + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); if (build->grpRowIdx >= 0) { @@ -916,8 +798,8 @@ _return: } static int32_t mLeftJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) { - SMJoinTableInfo* probe = pCtx->pJoin->probe; - SMJoinTableInfo* build = pCtx->pJoin->build; + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); blockDataReset(pCtx->midBlk); @@ -970,8 +852,8 @@ static int32_t mLeftJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) static int32_t mLeftJoinHashSeqCart(SMJoinMergeCtx* pCtx) { - SMJoinTableInfo* probe = pCtx->pJoin->probe; - SMJoinTableInfo* build = pCtx->pJoin->build; + SMJoinTableCtx* probe = pCtx->pJoin->probe; + SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); bool contLoop = false; @@ -1039,14 +921,14 @@ static int32_t mLeftJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, pCtx->hashJoin = true; - return mLeftJoinHashCart(pJoin, pCtx); + return mLeftJoinHashCart(pCtx); } - return mLeftJoinMergeCart(pJoin, pCtx); + return mLeftJoinMergeCart(pCtx); } static bool mLeftJoinHandleMidRemains(SMJoinMergeCtx* pCtx) { - ASSERT(0 < pCtx->midBlk.info.rows); + ASSERT(0 < pCtx->midBlk->info.rows); TSWAP(pCtx->midBlk, pCtx->finBlk); @@ -1161,16 +1043,46 @@ _return: return pCtx->finBlk; } +void mJoinResetTableCtx(SMJoinTableCtx* pCtx) { + pCtx->dsInitDone = false; + pCtx->dsFetchDone = false; + mJoinDestroyCreatedBlks(pCtx->createdBlks); + tSimpleHashClear(pCtx->pGrpHash); +} + +void mJoinResetMergeCtx(SMJoinMergeCtx* pCtx) { + pCtx->grpRemains = false; + pCtx->midRemains = false; + pCtx->lastEqGrp = false; + + pCtx->lastEqTs = INT64_MIN; + pCtx->hashJoin = false; +} + +void mJoinResetCtx(SMJoinOperatorInfo* pJoin) { + mJoinResetMergeCtx(&pJoin->ctx.mergeCtx); +} + +void mJoinResetOperator(struct SOperatorInfo* pOperator) { + SMJoinOperatorInfo* pJoin = pOperator->info; + + mJoinResetTableCtx(pJoin->build); + mJoinResetTableCtx(pJoin->probe); + + mJoinResetCtx(pJoin); + + pOperator->status = OP_OPENED; +} SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; if (pOperator->status == OP_EXEC_DONE) { if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) { - qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoin->resRows); + qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo)); return NULL; } else { - resetMergeJoinOperator(pOperator); + mJoinResetOperator(pOperator); qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo)); } } @@ -1207,6 +1119,13 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { } +void destroyMergeJoinOperator(void* param) { + SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param; + + taosMemoryFreeClear(param); +} + + SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo)); @@ -1216,14 +1135,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t int32_t code = TSDB_CODE_SUCCESS; if (pOperator == NULL || pInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + goto _return; } pInfo->pOperator = pOperator; - code = mJoinInitDownstreamInfo(pInfo, pDownstream, numOfDownstream, newDownstreams); - if (TSDB_CODE_SUCCESS != code) { - goto _error; - } + MJ_ERR_JRET(mJoinInitDownstreamInfo(pInfo, pDownstream, &numOfDownstream, &newDownstreams)); setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -1231,39 +1147,19 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); - MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode)); - code = mJoinBuildResColMap(pInfo, pJoinNode); - if (code) { - goto _error; - } - - code = initHJoinBufPages(pInfo); - if (code) { - goto _error; - } - if (pJoinNode->pFullOnCond != NULL) { - code = filterInitFromNode(pJoinNode->pFullOnCond, &pInfo->pFPreFilter, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + MJ_ERR_JRET(filterInitFromNode(pJoinNode->pFullOnCond, &pInfo->pFPreFilter, 0)); } if (pJoinNode->pColOnCond != NULL) { - code = filterInitFromNode(pJoinNode->pColOnCond, &pInfo->pPreFilter, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + MJ_ERR_JRET(filterInitFromNode(pJoinNode->pColOnCond, &pInfo->pPreFilter, 0)); } if (pJoinNode->node.pConditions != NULL) { - code = filterInitFromNode(pJoinNode->node.pConditions, &pInfo->pFinFilter, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + MJ_ERR_JRET(filterInitFromNode(pJoinNode->node.pConditions, &pInfo->pFinFilter, 0)); } if (pJoinNode->node.inputTsOrder == ORDER_ASC) { @@ -1276,10 +1172,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); - code = appendDownstream(pOperator, pDownstream, numOfDownstream); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + MJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream)); + if (newDownstreams) { taosMemoryFree(pDownstream); pOperator->numOfRealDownstream = 1; @@ -1289,7 +1183,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t return pOperator; -_error: +_return: if (pInfo != NULL) { destroyMergeJoinOperator(pInfo); } @@ -1302,24 +1196,3 @@ _error: return NULL; } -void destroyMergeJoinOperator(void* param) { - SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param; - if (pJoinOperator->pColEqualOnConditions != NULL) { - mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); - taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf); - taosArrayDestroy(pJoinOperator->rightEqOnCondCols); - - taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf); - taosArrayDestroy(pJoinOperator->leftEqOnCondCols); - } - nodesDestroyNode(pJoinOperator->pCondAfterMerge); - - taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks); - taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks); - taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations); - taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations); - - pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); - taosMemoryFreeClear(param); -} - diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 19bac5eb00..2ab5816184 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2072,15 +2072,24 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) { } static const char* jkJoinPhysiPlanJoinType = "JoinType"; +static const char* jkJoinPhysiPlanSubType = "SubType"; +static const char* jkJoinPhysiPlanWinOffset = "WindowOffset"; +static const char* jkJoinPhysiPlanJoinLimit = "JoinLimit"; +static const char* jkJoinPhysiPlanLeftPrimSlotId = "LeftPrimSlotId"; +static const char* jkJoinPhysiPlanRightPrimSlotId = "RightPrimSlotId"; +static const char* jkJoinPhysiPlanLeftEqCols = "LeftEqCols"; +static const char* jkJoinPhysiPlanRightEqCols = "RightEqCols"; static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder"; static const char* jkJoinPhysiPlanOnLeftCols = "OnLeftColumns"; static const char* jkJoinPhysiPlanOnRightCols = "OnRightColumns"; static const char* jkJoinPhysiPlanPrimKeyCondition = "PrimKeyCondition"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; -static const char* jkJoinPhysiPlanColEqualOnConditions = "ColumnEqualOnConditions"; -static const char* jkJoinPhysiPlanInputRowNum = "InputRowNum"; -static const char* jkJoinPhysiPlanInputRowSize = "InputRowSize"; +static const char* jkJoinPhysiPlanColOnConditions = "ColumnOnConditions"; +static const char* jkJoinPhysiPlanLeftInputRowNum = "LeftInputRowNum"; +static const char* jkJoinPhysiPlanRightInputRowNum = "RightInputRowNum"; +static const char* jkJoinPhysiPlanLeftInputRowSize = "LeftInputRowSize"; +static const char* jkJoinPhysiPlanRightInputRowSize = "RightInputRowSize"; static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; @@ -2090,7 +2099,25 @@ static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinPhysiPlanPrimKeyCondition, nodeToJson, pNode->pPrimKeyCond); + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanSubType, pNode->subType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanWinOffset, nodeToJson, pNode->pWindowOffset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanJoinLimit, nodeToJson, pNode->pJLimit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightPrimSlotId, pNode->rightPrimSlotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkJoinPhysiPlanLeftEqCols, pNode->pEqLeft); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkJoinPhysiPlanRightEqCols, pNode->pEqRight); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pFullOnCond); @@ -2099,8 +2126,21 @@ static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinPhysiPlanColEqualOnConditions, nodeToJson, pNode->pColEqCond); + code = tjsonAddObject(pJson, jkJoinPhysiPlanColOnConditions, nodeToJson, pNode->pColOnCond); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftInputRowNum, pNode->inputStat[0].inputRowNum); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftInputRowSize, pNode->inputStat[0].inputRowSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowNum, pNode->inputStat[1].inputRowNum); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize); + } + return code; } @@ -2112,17 +2152,48 @@ static int32_t jsonToPhysiMergeJoinNode(const SJson* pJson, void* pObj) { tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pFullOnCond); + tjsonGetNumberValue(pJson, jkJoinPhysiPlanSubType, pNode->subType, code); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinPhysiPlanPrimKeyCondition, &pNode->pPrimKeyCond); + code = jsonToNodeObject(pJson, jkJoinPhysiPlanWinOffset, &pNode->pWindowOffset); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanJoinLimit, &pNode->pJLimit); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightPrimSlotId, pNode->rightPrimSlotId, code); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkJoinPhysiPlanLeftEqCols, &pNode->pEqLeft); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkJoinPhysiPlanRightEqCols, &pNode->pEqRight); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pFullOnCond); } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinPhysiPlanColEqualOnConditions, &pNode->pColEqCond); + code = jsonToNodeObject(pJson, jkJoinPhysiPlanColOnConditions, &pNode->pColOnCond); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftInputRowNum, pNode->inputStat[0].inputRowNum, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftInputRowSize, pNode->inputStat[0].inputRowSize, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowNum, pNode->inputStat[1].inputRowNum, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize, code); + } + return code; } @@ -2146,16 +2217,16 @@ static int32_t physiHashJoinNodeToJson(const void* pObj, SJson* pJson) { code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[0].inputRowNum); + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftInputRowNum, pNode->inputStat[0].inputRowNum); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[0].inputRowSize); + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftInputRowSize, pNode->inputStat[0].inputRowSize); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[1].inputRowNum); + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowNum, pNode->inputStat[1].inputRowNum); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[1].inputRowSize); + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize); } return code; } @@ -2181,16 +2252,16 @@ static int32_t jsonToPhysiHashJoinNode(const SJson* pJson, void* pObj) { code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[0].inputRowNum, code); + tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftInputRowNum, pNode->inputStat[0].inputRowNum, code); } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[0].inputRowSize, code); + tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftInputRowSize, pNode->inputStat[0].inputRowSize, code); } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[1].inputRowNum, code); + tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowNum, pNode->inputStat[1].inputRowNum, code); } if (TSDB_CODE_SUCCESS == code) { - tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[1].inputRowSize, code); + tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize, code); } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 7a5b01c182..618cb69b83 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2400,11 +2400,20 @@ static int32_t msgToPhysiProjectNode(STlvDecoder* pDecoder, void* pObj) { enum { PHY_SORT_MERGE_JOIN_CODE_BASE_NODE = 1, PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE, - PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION, - PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, + PHY_SORT_MERGE_JOIN_CODE_SUB_TYPE, + PHY_SORT_MERGE_JOIN_CODE_WINDOW_OFFSET, + PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT, + PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID, + PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_SLOT_ID, + PHY_SORT_MERGE_JOIN_CODE_LEFT_EQ_COLS, + PHY_SORT_MERGE_JOIN_CODE_RIGHT_EQ_COLS, + PHY_SORT_MERGE_JOIN_CODE_FULL_ON_CONDITIONS, PHY_SORT_MERGE_JOIN_CODE_TARGETS, - PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, - PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS + PHY_SORT_MERGE_JOIN_CODE_COL_ON_CONDITIONS, + PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM0, + PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0, + PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1, + PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1 }; static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2415,17 +2424,48 @@ static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE, pNode->joinType); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION, nodeToMsg, pNode->pPrimKeyCond); + code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_SUB_TYPE, pNode->subType); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pFullOnCond); + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_WINDOW_OFFSET, nodeToMsg, pNode->pWindowOffset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT, nodeToMsg, pNode->pJLimit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID, pNode->leftPrimSlotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_SLOT_ID, pNode->rightPrimSlotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_LEFT_EQ_COLS, nodeListToMsg, pNode->pEqLeft); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_RIGHT_EQ_COLS, nodeListToMsg, pNode->pEqRight); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_COL_ON_CONDITIONS, nodeToMsg, pNode->pColOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_FULL_ON_CONDITIONS, nodeToMsg, pNode->pFullOnCond); } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pColEqCond); + code = tlvEncodeI64(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM0, pNode->inputStat[0].inputRowNum); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1, pNode->inputStat[1].inputRowNum); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0, pNode->inputStat[0].inputRowSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1, pNode->inputStat[1].inputRowSize); + } + return code; } @@ -2442,17 +2482,47 @@ static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE: code = tlvDecodeEnum(pTlv, &pNode->joinType, sizeof(pNode->joinType)); break; - case PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION: - code = msgToNodeFromTlv(pTlv, (void**)&pNode->pPrimKeyCond); + case PHY_SORT_MERGE_JOIN_CODE_SUB_TYPE: + code = tlvDecodeEnum(pTlv, &pNode->subType, sizeof(pNode->subType)); break; - case PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS: + case PHY_SORT_MERGE_JOIN_CODE_WINDOW_OFFSET: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pWindowOffset); + break; + case PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pJLimit); + break; + case PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID: + code = tlvDecodeI32(pTlv, &pNode->leftPrimSlotId); + break; + case PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_SLOT_ID: + code = tlvDecodeI32(pTlv, &pNode->rightPrimSlotId); + break; + case PHY_SORT_MERGE_JOIN_CODE_LEFT_EQ_COLS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pEqLeft); + break; + case PHY_SORT_MERGE_JOIN_CODE_RIGHT_EQ_COLS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pEqRight); + break; + case PHY_SORT_MERGE_JOIN_CODE_COL_ON_CONDITIONS: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pColOnCond); + break; + case PHY_SORT_MERGE_JOIN_CODE_FULL_ON_CONDITIONS: code = msgToNodeFromTlv(pTlv, (void**)&pNode->pFullOnCond); break; case PHY_SORT_MERGE_JOIN_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; - case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS: - code = msgToNodeFromTlv(pTlv, (void**)&pNode->pColEqCond); + case PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM0: + code = tlvDecodeI64(pTlv, &pNode->inputStat[0].inputRowNum); + break; + case PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1: + code = tlvDecodeI64(pTlv, &pNode->inputStat[1].inputRowNum); + break; + case PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0: + code = tlvDecodeI32(pTlv, &pNode->inputStat[0].inputRowSize); + break; + case PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1: + code = tlvDecodeI32(pTlv, &pNode->inputStat[1].inputRowSize); break; default: break; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 7c5ea785ef..9978df597d 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1339,10 +1339,15 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { SSortMergeJoinPhysiNode* pPhyNode = (SSortMergeJoinPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); + nodesDestroyNode(pPhyNode->pWindowOffset); + nodesDestroyNode(pPhyNode->pJLimit); + nodesDestroyList(pPhyNode->pEqLeft); + nodesDestroyList(pPhyNode->pEqRight); nodesDestroyNode(pPhyNode->pPrimKeyCond); nodesDestroyNode(pPhyNode->pFullOnCond); nodesDestroyList(pPhyNode->pTargets); nodesDestroyNode(pPhyNode->pColEqCond); + nodesDestroyNode(pPhyNode->pColOnCond); break; } case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 02ded720b8..894f020d75 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -760,6 +760,71 @@ static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SData return TSDB_CODE_SUCCESS; } +static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, SNodeList** ppLeft, SNodeList** ppRight) { + if (QUERY_NODE_OPERATOR == nodeType(pEqCond) && ((SOperatorNode*)pEqCond)->opType == OP_TYPE_EQUAL) { + SOperatorNode* pOp = (SOperatorNode*)pEqCond; + if (leftBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) { + nodesListMakeStrictAppend(ppLeft, nodesCloneNode(pOp->pLeft)); + } else if (rightBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) { + nodesListMakeStrictAppend(ppRight, nodesCloneNode(pOp->pLeft)); + } else { + planError("invalid col equal list, leftBlockId:%d", ((SColumnNode*)pOp->pLeft)->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + if (leftBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) { + nodesListMakeStrictAppend(ppLeft, nodesCloneNode(pOp->pLeft)); + } else if (rightBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) { + nodesListMakeStrictAppend(ppRight, nodesCloneNode(pOp->pLeft)); + } else { + planError("invalid col equal list, rightBlockId:%d", ((SColumnNode*)pOp->pRight)->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + } else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pEqCond) && ((SLogicConditionNode*)pEqCond)->condType == LOGIC_COND_TYPE_AND) { + SLogicConditionNode* pLogic = (SLogicConditionNode*)pEqCond; + SNode* pNode = NULL; + FOREACH(pNode, pLogic->pParameterList) { + int32_t code = setColEqList(pNode, leftBlkId, rightBlkId, ppLeft, ppRight); + if (code) { + return code; + } + } + } else { + planError("invalid col equal cond, type:%d", nodeType(pEqCond)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t setColEqCond(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, int32_t* pLeftId, int32_t* pRightId) { + if (QUERY_NODE_OPERATOR == nodeType(pEqCond) && ((SOperatorNode*)pEqCond)->opType == OP_TYPE_EQUAL) { + SOperatorNode* pOp = (SOperatorNode*)pEqCond; + if (leftBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) { + *pLeftId = ((SColumnNode*)pOp->pLeft)->slotId; + } else if (rightBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) { + *pRightId = ((SColumnNode*)pOp->pLeft)->slotId; + } else { + planError("invalid primary key col equal cond, leftBlockId:%d", ((SColumnNode*)pOp->pLeft)->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + if (leftBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) { + *pLeftId = ((SColumnNode*)pOp->pRight)->slotId; + } else if (rightBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) { + *pRightId = ((SColumnNode*)pOp->pRight)->slotId; + } else { + planError("invalid primary key col equal cond, rightBlockId:%d", ((SColumnNode*)pOp->pRight)->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + } else { + planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) { SSortMergeJoinPhysiNode* pJoin = @@ -784,6 +849,9 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond); + if (TSDB_CODE_SUCCESS == code) { + code = setColEqCond(pJoin->pPrimKeyCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, &pJoin->leftPrimSlotId, &pJoin->rightPrimSlotId); + } } if (TSDB_CODE_SUCCESS == code) { @@ -806,11 +874,13 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi } */ if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) { - code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond); + if (TSDB_CODE_SUCCESS == code) { + code = setColEqList(pJoin->pColEqCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, &pJoin->pEqLeft, &pJoin->pEqRight); + } } - if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) { code = mergeEqCond(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond); }