From dcfa6c220e5c80420f8b7a72f73cd236049fc14d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 18 Jan 2024 17:09:59 +0800 Subject: [PATCH] fix: compile issues --- source/libs/executor/inc/mergejoin.h | 8 +-- source/libs/executor/src/mergejoin.c | 69 +++++++------------- source/libs/executor/src/mergejoinoperator.c | 13 ++-- source/libs/planner/src/planPhysiCreater.c | 8 +-- 4 files changed, 38 insertions(+), 60 deletions(-) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 0e04bf7c0c..e8653c3022 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -231,7 +231,6 @@ typedef struct SMJoinWindowCtx { bool asofLowerRow; bool asofEqRow; bool asofGreaterRow; - int64_t jLimit; bool eqPostDone; int64_t lastTs; @@ -314,8 +313,8 @@ typedef struct SMJoinOperatorInfo { #define MJOIN_PUSH_BLK_TO_CACHE(_cache, _blk) \ do { \ - ASSERT(taosArrayGetSize(_cache)->grps <= 1); \ - SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayReserve(_cache)->grps, 1); \ + ASSERT(taosArrayGetSize((_cache)->grps) <= 1); \ + SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayReserve((_cache)->grps, 1); \ (_cache)->rowNum += (_blk)->info.rows; \ pGrp->blk = (_blk); \ pGrp->beginIdx = 0; \ @@ -379,6 +378,7 @@ typedef struct SMJoinOperatorInfo { } while (0) +int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode); int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode); SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator); @@ -398,7 +398,7 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, S int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond); int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx); int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp); -int32_t mJoinNonEqCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp); +int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp); int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin); int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx); int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx); diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 6f4748b875..f6a5307c06 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -419,7 +419,7 @@ static FORCE_INLINE int32_t mLeftJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); } - return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true); + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true); } SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { @@ -505,7 +505,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; - MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true)); + MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -742,7 +742,7 @@ static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); } - return pCtx->lastProbeGrp ? mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true) : mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false); + return pCtx->lastProbeGrp ? mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true) : mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false); } static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { @@ -1100,7 +1100,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; - MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true)); + MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -1121,7 +1121,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; - MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false)); + MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -1442,7 +1442,7 @@ static FORCE_INLINE int32_t mAntiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); } - return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true); + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true); } static int32_t mAntiJoinHashFullCart(SMJoinMergeCtx* pCtx) { @@ -1696,7 +1696,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; - MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true)); + MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -1787,7 +1787,7 @@ int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* who continue; } - return TSDB_CODE_SUCCESS + return TSDB_CODE_SUCCESS; } } @@ -1802,29 +1802,6 @@ int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* who } - -int32_t mAsofJoinProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { - pCtx->buildGrp.beginIdx = pJoin->build->blkRowIdx; - pCtx->buildGrp.readIdx = pCtx->buildGrp.beginIdx; - pCtx->buildGrp.endIdx = pCtx->buildGrp.beginIdx; - - while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { - MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build); - if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) { - pCtx->buildGrp.endIdx = pJoin->build->blkRowIdx; - continue; - } - - break; - } - - pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; - pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; - pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx; - - return mAsofJoinHandleProbeGreater(pCtx); -} - int32_t mAsofJoinAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) { int64_t eqRowsNum = 0; SMJoinGrpRows grp = {.blk = pTable->blk}; @@ -1881,12 +1858,10 @@ int32_t mAsofJoinAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowC int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { if (pCtx->cache.outBlk->info.rows <= 0) { - return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, probeGrp, true); + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true); } int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; - SMJoinTableCtx* probe = pCtx->pJoin->probe; - SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = &pCtx->probeGrp; SMJoinGrpRows buildGrp = {.blk = pCtx->cache.outBlk, .readIdx = pCtx->cache.outRowIdx, .endIdx = pCtx->cache.outBlk->info.rows - 1}; int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); @@ -1906,7 +1881,7 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp)); if (++probeGrp->readIdx <= probeEndIdx) { probeGrp->endIdx = probeEndIdx; - buildGrp->readIdx = 0; + buildGrp.readIdx = 0; MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp)); } @@ -1920,7 +1895,7 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { if (grpNum > 0) { probeGrp->endIdx = probeGrp->readIdx + grpNum - 1; buildGrp.readIdx = 0; - MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp)); rowsLeft -= grpNum * pCtx->cache.outBlk->info.rows; pCtx->cache.outRowIdx = 0; probeGrp->readIdx += grpNum; @@ -1933,14 +1908,14 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { int32_t grpRemainRows = pCtx->cache.outBlk->info.rows - pCtx->cache.outRowIdx; if (rowsLeft >= grpRemainRows) { - MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp)); rowsLeft -= grpRemainRows; pCtx->cache.outRowIdx = 0; continue; } - buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; - mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + buildGrp.endIdx = buildGrp.readIdx + rowsLeft - 1; + mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp); pCtx->cache.outRowIdx += rowsLeft; break; } @@ -1980,7 +1955,7 @@ int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool pCtx->lastEqGrp = true; pCtx->eqPostDone = false; - MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &pCtx->probeGrp)); + MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp)); return mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp); } @@ -2116,7 +2091,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { MJ_ERR_JRET(mAsofLowerProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs)); } else { - MJ_ERR_JRET(mAsofLowerProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs)); + MJ_ERR_JRET(mAsofLowerProcessGreaterGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs)); } if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { @@ -2283,7 +2258,7 @@ int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool* continue; } - return TSDB_CODE_SUCCESS + return TSDB_CODE_SUCCESS; } } @@ -2341,7 +2316,7 @@ int32_t mAsofGreaterProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bo pCtx->lastEqGrp = true; pCtx->cache.grpIdx = 0; - MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &pCtx->probeGrp)); + MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp)); return mAsofGreaterUpdateDumpEqRows(pCtx, timestamp, lastBuildGrp); } @@ -2492,7 +2467,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) { MJ_ERR_JRET(mAsofGreaterProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs)); } else { - MJ_ERR_JRET(mAsofGreaterProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs)); + MJ_ERR_JRET(mAsofGreaterProcessGreaterGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs)); } if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { @@ -2555,6 +2530,12 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p pCtx->asofGreaterRow = ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType); pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; + if (pCtx->asofLowerRow) { + pJoin->joinFp = mAsofLowerJoinDo; + } else if (pCtx->asofGreaterRow) { + pJoin->joinFp = mAsofGreaterJoinDo; + } + if (pJoinNode->node.inputTsOrder != ORDER_DESC) { pCtx->ascTs = true; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 14b7c8fde1..a028790524 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -588,7 +588,7 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB return (*pCtx->mergeCartFp)(pCtx); } -int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { +int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { pCtx->probeNEqGrp.blk = pTb->blk; pCtx->probeNEqGrp.beginIdx = pTb->blkRowIdx; pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; @@ -604,7 +604,7 @@ int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum break; } - return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true); + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true); } int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { @@ -623,7 +623,7 @@ int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum break; } - return mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false); + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false); } @@ -1365,11 +1365,8 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) { case JOIN_STYPE_ANTI: pJoin->joinFp = mAntiJoinDo; break; - case JOIN_STYPE_ASOF: - pJoin->joinFp = mAsofJoinDo; - break; case JOIN_STYPE_WIN: - pJoin->joinFp = mWinJoinDo; + //pJoin->joinFp = mWinJoinDo; break; default: break; @@ -1410,8 +1407,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); - MJ_ERR_JRET(mJoinSetImplFp(pInfo)); MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode)); + MJ_ERR_JRET(mJoinSetImplFp(pInfo)); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 7299e03a2d..e4d1206205 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -792,10 +792,10 @@ static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, if (leftBlkId == pCol->dataBlockId) { pJoin->leftPrimSlotId = pCol->slotId; pJoin->asofOpType = pOp->opType; - pJoin->leftPrimExpr = nodesCloneNode(pFunc); + pJoin->leftPrimExpr = nodesCloneNode((SNode*)pFunc); } else if (rightBlkId == pCol->dataBlockId) { pJoin->rightPrimSlotId = pCol->slotId; - pJoin->rightPrimExpr = nodesCloneNode(pFunc); + pJoin->rightPrimExpr = nodesCloneNode((SNode*)pFunc); } else { planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId); return TSDB_CODE_PLAN_INTERNAL_ERROR; @@ -836,10 +836,10 @@ static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, if (leftBlkId == pCol->dataBlockId) { pJoin->leftPrimSlotId = pCol->slotId; pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType); - pJoin->leftPrimExpr = nodesCloneNode(pFunc); + pJoin->leftPrimExpr = nodesCloneNode((SNode*)pFunc); } else if (rightBlkId == pCol->dataBlockId) { pJoin->rightPrimSlotId = pCol->slotId; - pJoin->rightPrimExpr = nodesCloneNode(pFunc); + pJoin->rightPrimExpr = nodesCloneNode((SNode*)pFunc); } else { planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId); return TSDB_CODE_PLAN_INTERNAL_ERROR;