From 558d22dff449f1bb7194070a2e4288cc2670a126 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 19 Jul 2024 18:27:49 +0800 Subject: [PATCH] fix: function return code validation --- source/libs/executor/src/hashjoin.c | 4 +- source/libs/executor/src/hashjoinoperator.c | 84 +++++++++++++++----- source/libs/executor/src/mergejoin.c | 74 +++++++++++------ source/libs/executor/src/mergejoinoperator.c | 8 +- 4 files changed, 124 insertions(+), 46 deletions(-) diff --git a/source/libs/executor/src/hashjoin.c b/source/libs/executor/src/hashjoin.c index c22b331a16..d4a84afea2 100755 --- a/source/libs/executor/src/hashjoin.c +++ b/source/libs/executor/src/hashjoin.c @@ -90,7 +90,7 @@ int32_t hLeftJoinHandleSeqRowRemains(struct SOperatorInfo* pOperator, SHJoinOper while (!allFetched) { hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched); if (pJoin->midBlk->info.rows > 0) { - doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL); + HJ_ERR_RET(doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL)); if (pJoin->midBlk->info.rows > 0) { pCtx->readMatch = true; HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk)); @@ -170,7 +170,7 @@ int32_t hLeftJoinHandleSeqProbeRows(struct SOperatorInfo* pOperator, SHJoinOpera while (!allFetched) { hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched); if (pJoin->midBlk->info.rows > 0) { - doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL); + HJ_ERR_RET(doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL)); if (pJoin->midBlk->info.rows > 0) { pCtx->readMatch = true; HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk)); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 2fe2ccc56f..bbbdff6c17 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -238,10 +238,12 @@ static int32_t hJoinInitValColsInfo(SHJoinTableCtx* pTable, SNodeList* pList) { if (NULL == pTable->valVarCols) { pTable->valVarCols = taosArrayInit(pTable->valNum, sizeof(int32_t)); if (NULL == pTable->valVarCols) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } - taosArrayPush(pTable->valVarCols, &i); + if (NULL == taosArrayPush(pTable->valVarCols, &i)) { + return terrno; + } } pTable->valCols[i].bytes = pColNode->node.resType.bytes; if (!pTable->valCols[i].keyCol && !pTable->valCols[i].vardata) { @@ -332,7 +334,7 @@ static int32_t hJoinInitTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* return code; } - memcpy(&pTable->inputStat, pStat, sizeof(*pStat)); + TAOS_MEMCPY(&pTable->inputStat, pStat, sizeof(*pStat)); HJ_ERR_RET(hJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable)); @@ -416,7 +418,9 @@ static FORCE_INLINE int32_t hJoinAddPageToBufs(SArray* pRowBufs) { return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pRowBufs, &page); + if (NULL == taosArrayPush(pRowBufs, &page)) { + return terrno; + } return TSDB_CODE_SUCCESS; } @@ -464,12 +468,21 @@ static void hJoinDestroyKeyHash(SSHashObj** ppHash) { *ppHash = NULL; } -static FORCE_INLINE char* hJoinRetrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) { +static FORCE_INLINE int32_t hJoinRetrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow, char** ppData) { + *ppData = NULL; + if ((uint16_t)-1 == pRow->pageId) { - return NULL; + return TSDB_CODE_SUCCESS; } SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId); - return pPage->data + pRow->offset; + if (NULL == pPage) { + qError("fail to get %d page, total:%d", pRow->pageId, (int32_t)taosArrayGetSize(pRowBufs)); + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + + *ppData = pPage->data + pRow->offset; + + return TSDB_CODE_SUCCESS; } static int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { @@ -479,9 +492,11 @@ static int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum int32_t probeIdx = 0; SBufRowInfo* pRow = pStart; int32_t code = 0; + char* pData = NULL; for (int32_t r = 0; r < rowNum; ++r) { - char* pData = hJoinRetrieveColDataFromRowBufs(pJoin->pRowBufs, pRow); + HJ_ERR_RET(hJoinRetrieveColDataFromRowBufs(pJoin->pRowBufs, pRow, &pData)); + char* pValData = pData + pBuild->valBitMapSize; char* pKeyData = pProbe->keyData; buildIdx = buildValIdx = probeIdx = 0; @@ -544,7 +559,7 @@ int32_t hJoinCopyNMatchRowsToBlock(SHJoinOperatorInfo* pJoin, SSDataBlock* pRes, SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot); - colDataAssignNRows(pDst, pRes->info.rows, pSrc, startIdx, rows); + QRY_ERR_RET(colDataAssignNRows(pDst, pRes->info.rows, pSrc, startIdx, rows)); probeIdx++; } @@ -606,11 +621,11 @@ bool hJoinCopyKeyColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx, size_t *p } if (pTable->keyCols[i].vardata) { pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx]; - memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); + TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); bufLen += varDataTLen(pData); } else { pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx; - memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes); + TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes); bufLen += pTable->keyCols[i].bytes; } } @@ -627,6 +642,10 @@ bool hJoinCopyKeyColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx, size_t *p static int32_t hJoinSetKeyColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable) { for (int32_t i = 0; i < pTable->keyNum; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); + if (NULL == pCol) { + qError("fail to get %d col, total:%d", pTable->keyCols[i].srcSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock)); + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata); return TSDB_CODE_INVALID_PARA; @@ -654,6 +673,10 @@ static int32_t hJoinSetValColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable) continue; } SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->valCols[i].srcSlot); + if (NULL == pCol) { + qError("fail to get %d col, total:%d", pTable->valCols[i].srcSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock)); + QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } if (pTable->valCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->valCols[i].srcSlot, pCol->info.type, pTable->valCols[i].vardata); return TSDB_CODE_INVALID_PARA; @@ -683,7 +706,7 @@ static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableCtx* pTable, int32 char *pData = NULL; size_t bufLen = pTable->valBitMapSize; - memset(pTable->valData, 0, pTable->valBitMapSize); + TAOS_MEMSET(pTable->valData, 0, pTable->valBitMapSize); for (int32_t i = 0, m = 0; i < pTable->valNum; ++i) { if (pTable->valCols[i].keyCol) { continue; @@ -693,7 +716,7 @@ static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableCtx* pTable, int32 colDataSetNull_f(pTable->valData, m); } else { pData = pTable->valCols[i].data + pTable->valCols[i].offset[rowIdx]; - memcpy(pTable->valData + bufLen, pData, varDataTLen(pData)); + TAOS_MEMCPY(pTable->valData + bufLen, pData, varDataTLen(pData)); bufLen += varDataTLen(pData); } } else { @@ -701,7 +724,7 @@ static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableCtx* pTable, int32 colDataSetNull_f(pTable->valData, m); } else { pData = pTable->valCols[i].data + pTable->valCols[i].bytes * rowIdx; - memcpy(pTable->valData + bufLen, pData, pTable->valCols[i].bytes); + TAOS_MEMCPY(pTable->valData + bufLen, pData, pTable->valCols[i].bytes); bufLen += pTable->valCols[i].bytes; } } @@ -1009,7 +1032,11 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { } if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) { - doFilter(pRes, pJoin->pFinFilter, NULL); + code = doFilter(pRes, pJoin->pFinFilter, NULL); + if (code) { + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } } if (pRes->info.rows > 0) { return pRes; @@ -1037,7 +1064,11 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) { } if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) { - doFilter(pRes, pJoin->pFinFilter, NULL); + code = doFilter(pRes, pJoin->pFinFilter, NULL); + if (code) { + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } } if (pRes->info.rows > 0) { @@ -1118,13 +1149,25 @@ static uint32_t hJoinGetFinBlkCapacity(SHJoinOperatorInfo* pJoin, SHashJoinPhysi int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode) { pJoin->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + if (NULL == pJoin->finBlk) { + QRY_ERR_RET(terrno); + } ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0); - blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode)); + int32_t code = blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode)); + if (TSDB_CODE_SUCCESS != code) { + QRY_ERR_RET(terrno); + } if (NULL != pJoin->pPreFilter) { pJoin->midBlk = createOneDataBlock(pJoin->finBlk, false); - blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity); + if (NULL == pJoin->finBlk) { + QRY_ERR_RET(terrno); + } + code = blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity); + if (TSDB_CODE_SUCCESS != code) { + QRY_ERR_RET(terrno); + } } pJoin->blkThreshold = pJoin->finBlk->info.capacity * HJOIN_BLK_THRESHOLD_RATIO; @@ -1151,8 +1194,8 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); - hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); + HJ_ERR_JRET(hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0])); + HJ_ERR_JRET(hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1])); hJoinSetBuildAndProbeTable(pInfo, pJoinNode); @@ -1182,6 +1225,7 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n return pOperator; _return: + if (pInfo != NULL) { destroyHashJoinOperator(pInfo); } diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 5f0a2eadfb..1c625df52f 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -357,7 +357,7 @@ static int32_t mOuterJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) do { startRowIdx = build->grpRowIdx; - mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); + (void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); if (pCtx->midBlk->info.rows > 0) { if (build->rowBitmapSize > 0) { @@ -1283,7 +1283,7 @@ static int32_t mSemiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* p do { blockDataCleanup(pCtx->midBlk); - mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); + (void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); if (pCtx->midBlk->info.rows > 0) { MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pPreFilter)); @@ -1358,7 +1358,7 @@ static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) { build->pHashCurGrp = *(SArray**)pGrp; ASSERT(1 == taosArrayGetSize(build->pHashCurGrp)); build->grpRowIdx = 0; - mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + (void)mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); ASSERT(build->grpRowIdx < 0); } @@ -1616,7 +1616,7 @@ static int32_t mAntiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* p do { blockDataCleanup(pCtx->midBlk); - mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); + (void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); if (pCtx->midBlk->info.rows > 0) { MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pPreFilter)); @@ -2102,7 +2102,9 @@ int32_t mAsofBackwardHandleGrpRemains(SMJoinWindowCtx* pCtx) { return (pCtx->lastEqGrp) ? mAsofBackwardDumpUpdateEqRows(pCtx, pCtx->pJoin, false, true) : mAsofBackwardDumpGrpCache(pCtx); } -static bool mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { +static int32_t mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) { + *newBlock = false; + bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; @@ -2116,7 +2118,7 @@ static bool mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* mJoinSetDone(pOperator); } - return false; + return TSDB_CODE_SUCCESS; } break; @@ -2124,13 +2126,18 @@ static bool mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* if (buildGot && NULL == pCtx->cache.outBlk) { pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); - blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit); + if (NULL == pCtx->cache.outBlk) { + MJ_ERR_RET(terrno); + } + MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit)); } pCtx->probeGrp.blk = pJoin->probe->blk; pCtx->buildGrp.blk = pJoin->build->blk; - return true; + *newBlock = true; + + return TSDB_CODE_SUCCESS; } @@ -2142,6 +2149,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) { int64_t buildTs = 0; SColumnInfoData* pBuildCol = NULL; SColumnInfoData* pProbeCol = NULL; + bool newBlock = false; blockDataCleanup(pCtx->finBlk); @@ -2154,7 +2162,8 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mAsofBackwardRetrieve(pOperator, pJoin, pCtx)) { + MJ_ERR_JRET(mAsofBackwardRetrieve(pOperator, pJoin, pCtx, &newBlock)); + if (!newBlock) { if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) { continue; } @@ -2271,7 +2280,7 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) { pGrp->readIdx = 0; //pGrp->endIdx = pGrp->blk->info.rows - 1; } else { - taosArrayPop(pCache->grps); + (void)taosArrayPop(pCache->grps); pGrp = taosArrayGet(pCache->grps, 0); ASSERT(pGrp->blk == pCache->outBlk); //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx; @@ -2479,7 +2488,9 @@ int32_t mAsofForwardSkipBuildGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoi return TSDB_CODE_SUCCESS; } -static bool mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { +static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) { + *newBlock = false; + bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; @@ -2516,7 +2527,10 @@ static bool mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p if (buildGot && pJoin->build->newBlk) { if (NULL == pCtx->cache.outBlk) { pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); - blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit); + if (NULL == pCtx->cache.outBlk) { + MJ_ERR_RET(terrno); + } + MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit)); } MJOIN_PUSH_BLK_TO_CACHE(&pCtx->cache, pJoin->build->blk); @@ -2537,6 +2551,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) { int64_t buildTs = 0; SColumnInfoData* pBuildCol = NULL; SColumnInfoData* pProbeCol = NULL; + bool newBlock = false; blockDataCleanup(pCtx->finBlk); @@ -2549,7 +2564,8 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mAsofForwardRetrieve(pOperator, pJoin, pCtx)) { + MJ_ERR_JRET(mAsofForwardRetrieve(pOperator, pJoin, pCtx, &newBlock)); + if (!newBlock) { if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) { continue; } @@ -2643,7 +2659,7 @@ static FORCE_INLINE void mWinJoinPopFrontGroup(SMJoinWindowCtx* pCtx, SMJoinGrpR if (pGrp->blk == pCtx->cache.outBlk) { blockDataCleanup(pGrp->blk); } else if (pGrp->clonedBlk) { - blockDataDestroy(pGrp->blk); + (void)blockDataDestroy(pGrp->blk); } taosArrayPopFrontBatch(pCtx->cache.grps, 1); @@ -2667,6 +2683,10 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) { pGrp->beginIdx = 0; pGrp->readIdx = 0; } + + if (NULL == pGrp->blk) { + MJ_ERR_RET(terrno); + } pGrp->clonedBlk = true; } @@ -2674,14 +2694,16 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { +static int32_t mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) { + *newBlock = false; + bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; do { if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { if (NULL == pJoin->build->blk) { - mWinJoinCloneCacheBlk(pCtx); + MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx)); } buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); @@ -2692,7 +2714,7 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin mJoinSetDone(pOperator); } - return false; + return TSDB_CODE_SUCCESS; } if (buildGot && pCtx->forwardRowsAcq) { @@ -2709,8 +2731,9 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin } while (true); pCtx->probeGrp.blk = pJoin->probe->blk; - - return true; + *newBlock = true; + + return TSDB_CODE_SUCCESS; } int32_t mWinJoinTryAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) { @@ -3220,6 +3243,7 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int64_t probeTs = 0; SColumnInfoData* pProbeCol = NULL; + bool newBlock = false; blockDataCleanup(pCtx->finBlk); @@ -3232,7 +3256,8 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mWinJoinRetrieve(pOperator, pJoin, pCtx)) { + MJ_ERR_JRET(mWinJoinRetrieve(pOperator, pJoin, pCtx, &newBlock)); + if (!newBlock) { if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) { continue; } @@ -3367,7 +3392,7 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p } pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)); + MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode))); pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO; @@ -3407,11 +3432,14 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0); - blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)); + MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode))); if (pJoin->pFPreFilter) { pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false); - blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity); + if (NULL == pCtx->midBlk) { + MJ_ERR_RET(terrno); + } + MJ_ERR_RET(blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity)); } pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 2e2101231b..9db2c7d300 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1147,6 +1147,7 @@ void mJoinResetForBuildTable(SMJoinTableCtx* pTable) { int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) { SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); SMJoinGrpRows* pGrp = NULL; + int32_t code = TSDB_CODE_SUCCESS; if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { return TSDB_CODE_SUCCESS; @@ -1207,7 +1208,12 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) { pGrp->blk = createOneDataBlock(pTable->blk, true); - taosArrayPush(pTable->createdBlks, &pGrp->blk); + if (NULL == pGrp->blk) { + MJ_ERR_JRET(terrno); + } + if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) { + MJ_ERR_JRET(terrno); + } } else { if (!pTable->multiEqGrpRows) { pGrp->endIdx = pGrp->beginIdx;