fix: compile issues

This commit is contained in:
dapan1121 2024-01-18 17:09:59 +08:00
parent 97567f74cb
commit dcfa6c220e
4 changed files with 38 additions and 60 deletions

View File

@ -231,7 +231,6 @@ typedef struct SMJoinWindowCtx {
bool asofLowerRow;
bool asofEqRow;
bool asofGreaterRow;
int64_t jLimit;
bool eqPostDone;
int64_t lastTs;
@ -314,8 +313,8 @@ typedef struct SMJoinOperatorInfo {
#define MJOIN_PUSH_BLK_TO_CACHE(_cache, _blk) \
do { \
ASSERT(taosArrayGetSize(_cache)->grps <= 1); \
SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayReserve(_cache)->grps, 1); \
ASSERT(taosArrayGetSize((_cache)->grps) <= 1); \
SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayReserve((_cache)->grps, 1); \
(_cache)->rowNum += (_blk)->info.rows; \
pGrp->blk = (_blk); \
pGrp->beginIdx = 0; \
@ -379,6 +378,7 @@ typedef struct SMJoinOperatorInfo {
} while (0)
int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode);
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode);
SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator);
@ -398,7 +398,7 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, S
int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond);
int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx);
int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp);
int32_t mJoinNonEqCart(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp);
int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp);
int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin);
int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx);
int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx);

View File

@ -419,7 +419,7 @@ static FORCE_INLINE int32_t mLeftJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
}
return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true);
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true);
}
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
@ -505,7 +505,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true));
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
@ -742,7 +742,7 @@ static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
}
return pCtx->lastProbeGrp ? mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true) : mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false);
return pCtx->lastProbeGrp ? mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true) : mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false);
}
static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
@ -1100,7 +1100,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true));
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
@ -1121,7 +1121,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false));
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
@ -1442,7 +1442,7 @@ static FORCE_INLINE int32_t mAntiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
}
return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true);
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true);
}
static int32_t mAntiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
@ -1696,7 +1696,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true));
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
@ -1787,7 +1787,7 @@ int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* who
continue;
}
return TSDB_CODE_SUCCESS
return TSDB_CODE_SUCCESS;
}
}
@ -1802,29 +1802,6 @@ int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* who
}
int32_t mAsofJoinProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
pCtx->buildGrp.beginIdx = pJoin->build->blkRowIdx;
pCtx->buildGrp.readIdx = pCtx->buildGrp.beginIdx;
pCtx->buildGrp.endIdx = pCtx->buildGrp.beginIdx;
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build);
if (PROBE_TS_GREATER(pCtx->ascTs, *probeTs, *buildTs)) {
pCtx->buildGrp.endIdx = pJoin->build->blkRowIdx;
continue;
}
break;
}
pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
return mAsofJoinHandleProbeGreater(pCtx);
}
int32_t mAsofJoinAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) {
int64_t eqRowsNum = 0;
SMJoinGrpRows grp = {.blk = pTable->blk};
@ -1881,12 +1858,10 @@ int32_t mAsofJoinAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowC
int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
if (pCtx->cache.outBlk->info.rows <= 0) {
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, probeGrp, true);
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true);
}
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = &pCtx->probeGrp;
SMJoinGrpRows buildGrp = {.blk = pCtx->cache.outBlk, .readIdx = pCtx->cache.outRowIdx, .endIdx = pCtx->cache.outBlk->info.rows - 1};
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
@ -1906,7 +1881,7 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
if (++probeGrp->readIdx <= probeEndIdx) {
probeGrp->endIdx = probeEndIdx;
buildGrp->readIdx = 0;
buildGrp.readIdx = 0;
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
}
@ -1920,7 +1895,7 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
if (grpNum > 0) {
probeGrp->endIdx = probeGrp->readIdx + grpNum - 1;
buildGrp.readIdx = 0;
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
rowsLeft -= grpNum * pCtx->cache.outBlk->info.rows;
pCtx->cache.outRowIdx = 0;
probeGrp->readIdx += grpNum;
@ -1933,14 +1908,14 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
int32_t grpRemainRows = pCtx->cache.outBlk->info.rows - pCtx->cache.outRowIdx;
if (rowsLeft >= grpRemainRows) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
rowsLeft -= grpRemainRows;
pCtx->cache.outRowIdx = 0;
continue;
}
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
buildGrp.endIdx = buildGrp.readIdx + rowsLeft - 1;
mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp);
pCtx->cache.outRowIdx += rowsLeft;
break;
}
@ -1980,7 +1955,7 @@ int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool
pCtx->lastEqGrp = true;
pCtx->eqPostDone = false;
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &pCtx->probeGrp));
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp));
return mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp);
}
@ -2116,7 +2091,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) {
if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mAsofLowerProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs));
} else {
MJ_ERR_JRET(mAsofLowerProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs));
MJ_ERR_JRET(mAsofLowerProcessGreaterGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs));
}
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
@ -2283,7 +2258,7 @@ int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool*
continue;
}
return TSDB_CODE_SUCCESS
return TSDB_CODE_SUCCESS;
}
}
@ -2341,7 +2316,7 @@ int32_t mAsofGreaterProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bo
pCtx->lastEqGrp = true;
pCtx->cache.grpIdx = 0;
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &pCtx->probeGrp));
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp));
return mAsofGreaterUpdateDumpEqRows(pCtx, timestamp, lastBuildGrp);
}
@ -2492,7 +2467,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) {
if (PROBE_TS_LOWER(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mAsofGreaterProcessLowerGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs));
} else {
MJ_ERR_JRET(mAsofGreaterProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs));
MJ_ERR_JRET(mAsofGreaterProcessGreaterGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs));
}
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
@ -2555,6 +2530,12 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
pCtx->asofGreaterRow = ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType);
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
if (pCtx->asofLowerRow) {
pJoin->joinFp = mAsofLowerJoinDo;
} else if (pCtx->asofGreaterRow) {
pJoin->joinFp = mAsofGreaterJoinDo;
}
if (pJoinNode->node.inputTsOrder != ORDER_DESC) {
pCtx->ascTs = true;
}

View File

@ -588,7 +588,7 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB
return (*pCtx->mergeCartFp)(pCtx);
}
int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
pCtx->probeNEqGrp.blk = pTb->blk;
pCtx->probeNEqGrp.beginIdx = pTb->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
@ -604,7 +604,7 @@ int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum
break;
}
return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true);
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true);
}
int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
@ -623,7 +623,7 @@ int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum
break;
}
return mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false);
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false);
}
@ -1365,11 +1365,8 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
case JOIN_STYPE_ANTI:
pJoin->joinFp = mAntiJoinDo;
break;
case JOIN_STYPE_ASOF:
pJoin->joinFp = mAsofJoinDo;
break;
case JOIN_STYPE_WIN:
pJoin->joinFp = mWinJoinDo;
//pJoin->joinFp = mWinJoinDo;
break;
default:
break;
@ -1410,8 +1407,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
MJ_ERR_JRET(mJoinSetImplFp(pInfo));
MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode));
MJ_ERR_JRET(mJoinSetImplFp(pInfo));
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);

View File

@ -792,10 +792,10 @@ static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId,
if (leftBlkId == pCol->dataBlockId) {
pJoin->leftPrimSlotId = pCol->slotId;
pJoin->asofOpType = pOp->opType;
pJoin->leftPrimExpr = nodesCloneNode(pFunc);
pJoin->leftPrimExpr = nodesCloneNode((SNode*)pFunc);
} else if (rightBlkId == pCol->dataBlockId) {
pJoin->rightPrimSlotId = pCol->slotId;
pJoin->rightPrimExpr = nodesCloneNode(pFunc);
pJoin->rightPrimExpr = nodesCloneNode((SNode*)pFunc);
} else {
planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
@ -836,10 +836,10 @@ static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId,
if (leftBlkId == pCol->dataBlockId) {
pJoin->leftPrimSlotId = pCol->slotId;
pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType);
pJoin->leftPrimExpr = nodesCloneNode(pFunc);
pJoin->leftPrimExpr = nodesCloneNode((SNode*)pFunc);
} else if (rightBlkId == pCol->dataBlockId) {
pJoin->rightPrimSlotId = pCol->slotId;
pJoin->rightPrimExpr = nodesCloneNode(pFunc);
pJoin->rightPrimExpr = nodesCloneNode((SNode*)pFunc);
} else {
planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;