From 45348b15be25cd23b8bfc01f8713b68111fe5422 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 22 Jul 2024 11:06:24 +0800 Subject: [PATCH] fix: return code validation --- source/libs/executor/inc/mergejoin.h | 11 +- source/libs/executor/src/groupcacheoperator.c | 8 +- source/libs/executor/src/mergejoin.c | 262 ++++++++++++++- source/libs/executor/src/mergejoinoperator.c | 316 ++++++++++++++---- 4 files changed, 503 insertions(+), 94 deletions(-) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index f6f1cf26f3..64db1a57a0 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -432,6 +432,15 @@ typedef struct SMJoinOperatorInfo { } \ } while (0) +#define MJ_RET(c) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + } \ + return _code; \ + } while (0) + void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin); @@ -459,7 +468,7 @@ int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pT int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable); int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable); int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp); -bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build); +int32_t mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build, bool* cont); int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond); int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx); int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp); diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 379cceeb03..e9a6c14141 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -149,7 +149,7 @@ void blockDataDeepCleanup(SSDataBlock* pDataBlock) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); if (NULL == p) { - qError("fail to get %dth col in dataBlock, numOfCols:%d", i, numOfCols); + qError("fail to get %dth col in dataBlock, numOfCols:%d", i, (int32_t)numOfCols); continue; } taosMemoryFreeClear(p->pData); @@ -352,7 +352,7 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr } pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId)); if (NULL == pBufInfo) { - qError("fail to get blk %d from pCache->pDirtyBlk", pBufInfo->basic.blkId); + qError("fail to get blk %" PRId64 " from pCache->pDirtyBlk", pBufInfo->basic.blkId); QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } int32_t code = TSDB_CODE_SUCCESS; @@ -462,7 +462,7 @@ void blockDataDeepClear(SSDataBlock* pDataBlock) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); if (NULL == p) { - qError("fail to get %d col from pDataBlock, numOfCols:%d", i, numOfCols); + qError("fail to get %d col from pDataBlock, numOfCols:%d", i, (int32_t)numOfCols); continue; } p->pData = NULL; @@ -1239,7 +1239,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP *ppSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); if (NULL == *ppSession) { - qError("fail to get session %d from pSessions", pGcParam->sessionId); + qError("fail to get session %" PRId64 " from pSessions", pGcParam->sessionId); QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 1c625df52f..2c5121855a 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -72,9 +72,15 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { if ((!pCtx->seqWinGrp) && 0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) { SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0); + if (NULL == pFirstBuild) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } if (pFirstBuild->readIdx == pFirstBuild->beginIdx) { for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx); + if (NULL == buildGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); buildGrp->readIdx = buildGrp->beginIdx; } @@ -89,6 +95,9 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { probeGrp->endIdx = probeGrp->readIdx; for (; cache->grpIdx < buildGrpNum && rowsLeft > 0; ++cache->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx); + if (NULL == buildGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); @@ -134,7 +143,8 @@ static int32_t mOuterJoinHashFullCart(SMJoinMergeCtx* pCtx) { SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); if (build->grpRowIdx >= 0) { - bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + bool contLoop = false; + MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop)); if (build->grpRowIdx < 0) { probeGrp->readIdx++; } @@ -171,7 +181,8 @@ static int32_t mOuterJoinHashFullCart(SMJoinMergeCtx* pCtx) { } build->grpRowIdx = 0; - bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + bool contLoop = false; + MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop)); if (!contLoop) { if (build->grpRowIdx < 0) { probeGrp->readIdx++; @@ -193,15 +204,27 @@ static int32_t mOuterJoinMergeFullCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); int32_t probeEndIdx = probeGrp->endIdx; if (0 == build->grpIdx && probeRows * build->grpTotalRows <= rowsLeft) { SMJoinGrpRows* pFirstBuild = taosArrayGet(build->eqGrps, 0); + if (NULL == pFirstBuild) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (pFirstBuild->readIdx == pFirstBuild->beginIdx) { for (; build->grpIdx < buildGrpNum; ++build->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + if (NULL == buildGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); buildGrp->readIdx = buildGrp->beginIdx; } @@ -215,6 +238,9 @@ static int32_t mOuterJoinMergeFullCart(SMJoinMergeCtx* pCtx) { probeGrp->endIdx = probeGrp->readIdx; for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + if (NULL == buildGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); @@ -254,6 +280,10 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); int32_t probeEndIdx = probeGrp->endIdx; int32_t rowsLeft = pCtx->midBlk->info.capacity; @@ -274,6 +304,10 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + if (NULL == buildGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (startRowIdx < 0) { startRowIdx = buildGrp->readIdx; } @@ -351,13 +385,17 @@ static int32_t mOuterJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + int32_t startRowIdx = 0; //blockDataCleanup(pCtx->midBlk); do { startRowIdx = build->grpRowIdx; - (void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); + MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL)); if (pCtx->midBlk->info.rows > 0) { if (build->rowBitmapSize > 0) { @@ -406,6 +444,10 @@ static int32_t mOuterJoinHashSeqCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + bool contLoop = false; if (build->grpRowIdx >= 0) { @@ -500,7 +542,15 @@ static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi if (buildGot) { SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId); + if (NULL == pProbeCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId); + if (NULL == pBuildCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; buildGot = false; @@ -649,15 +699,27 @@ static int32_t mInnerJoinMergeCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); int32_t probeEndIdx = probeGrp->endIdx; if (0 == build->grpIdx && probeRows * build->grpTotalRows <= rowsLeft) { SMJoinGrpRows* pFirstBuild = taosArrayGet(build->eqGrps, 0); + if (NULL == pFirstBuild) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (pFirstBuild->readIdx == pFirstBuild->beginIdx) { for (; build->grpIdx < buildGrpNum; ++build->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + if (NULL == buildGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); buildGrp->readIdx = buildGrp->beginIdx; } @@ -671,6 +733,9 @@ static int32_t mInnerJoinMergeCart(SMJoinMergeCtx* pCtx) { probeGrp->endIdx = probeGrp->readIdx; for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + if (NULL == buildGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); @@ -711,9 +776,13 @@ static int32_t mInnerJoinHashCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } if (build->grpRowIdx >= 0) { - bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + bool contLoop = false; + MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop)); if (build->grpRowIdx < 0) { probeGrp->readIdx++; } @@ -734,7 +803,8 @@ static int32_t mInnerJoinHashCart(SMJoinMergeCtx* pCtx) { if (NULL != pGrp) { build->pHashCurGrp = *pGrp; build->grpRowIdx = 0; - bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + bool contLoop = false; + MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop)); if (!contLoop) { if (build->grpRowIdx < 0) { probeGrp->readIdx++; @@ -772,7 +842,15 @@ static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJo if (buildGot) { SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId); + if (NULL == pProbeCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId); + if (NULL == pBuildCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; buildGot = false; @@ -903,6 +981,10 @@ static int32_t mFullJoinMergeCart(SMJoinMergeCtx* pCtx) { static FORCE_INLINE int32_t mFullJoinOutputHashRow(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, int32_t idx) { SMJoinGrpRows grp = {0}; SMJoinRowPos* pPos = taosArrayGet(pGrpRows->pRows, idx); + if (NULL == pPos) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + grp.blk = pPos->pBlk; grp.readIdx = pPos->pos; grp.endIdx = pPos->pos; @@ -1042,6 +1124,10 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) { grpDone = false; SMJoinGrpRows* pGrpRows = taosArrayGet(build->eqGrps, pNMatch->grpIdx); + if (NULL == pGrpRows) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (pGrpRows->allRowsMatch) { continue; } @@ -1283,7 +1369,7 @@ static int32_t mSemiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* p do { blockDataCleanup(pCtx->midBlk); - (void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); + MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL)); if (pCtx->midBlk->info.rows > 0) { MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pPreFilter)); @@ -1312,6 +1398,9 @@ static int32_t mSemiJoinHashSeqCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } size_t bufLen = 0; int32_t probeEndIdx = probeGrp->endIdx; @@ -1343,6 +1432,10 @@ static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + size_t bufLen = 0; for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) { @@ -1358,7 +1451,7 @@ static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) { build->pHashCurGrp = *(SArray**)pGrp; ASSERT(1 == taosArrayGetSize(build->pHashCurGrp)); build->grpRowIdx = 0; - (void)mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build); + MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, NULL)); ASSERT(build->grpRowIdx < 0); } @@ -1372,6 +1465,10 @@ static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + SMJoinGrpRows* buildGrp = NULL; int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); int32_t probeEndIdx = probeGrp->endIdx; @@ -1387,6 +1484,9 @@ static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { blockDataCleanup(pCtx->midBlk); for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) { buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + if (NULL == buildGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); @@ -1448,6 +1548,10 @@ static int32_t mSemiJoinMergeFullCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, 0); + if (NULL == buildGrp || NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); int32_t probeEndIdx = probeGrp->endIdx; @@ -1584,6 +1688,10 @@ static int32_t mAntiJoinHashFullCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + size_t bufLen = 0; int32_t probeEndIdx = probeGrp->endIdx; @@ -1616,7 +1724,7 @@ static int32_t mAntiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* p do { blockDataCleanup(pCtx->midBlk); - (void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build); + MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL)); if (pCtx->midBlk->info.rows > 0) { MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pPreFilter)); @@ -1642,6 +1750,10 @@ static int32_t mAntiJoinHashSeqCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + size_t bufLen = 0; int32_t probeEndIdx = probeGrp->endIdx; @@ -1682,6 +1794,10 @@ static int32_t mAntiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); + if (NULL == probeGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + SMJoinGrpRows* buildGrp = NULL; int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); int32_t probeEndIdx = probeGrp->endIdx; @@ -1697,6 +1813,10 @@ static int32_t mAntiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { blockDataCleanup(pCtx->midBlk); for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) { buildGrp = taosArrayGet(build->eqGrps, build->grpIdx); + if (NULL == buildGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp)); rowsLeft -= GRP_REMAIN_ROWS(buildGrp); @@ -1889,6 +2009,9 @@ int32_t mAsofBackwardAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWin grp.blk = pTable->blk; SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); + if (NULL == pCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { return TSDB_CODE_SUCCESS; @@ -2249,6 +2372,10 @@ int32_t mAsofForwardTrimCacheBlk(SMJoinWindowCtx* pCtx) { } SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0); + if (NULL == pGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (pGrp->blk == pCtx->cache.outBlk && pCtx->pJoin->build->blkRowIdx > 0) { MJ_ERR_RET(blockDataTrimFirstRows(pGrp->blk, pCtx->pJoin->build->blkRowIdx)); pCtx->pJoin->build->blkRowIdx = 0; @@ -2271,6 +2398,10 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) { int32_t grpNum = taosArrayGetSize(pCache->grps); if (grpNum >= 1) { SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1); + if (NULL == pGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (pGrp->blk != pCache->outBlk) { int32_t beginIdx = (1 == grpNum) ? build->blkRowIdx : 0; MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pGrp->blk, beginIdx, pGrp->blk->info.rows - beginIdx)); @@ -2282,6 +2413,10 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) { } else { (void)taosArrayPop(pCache->grps); pGrp = taosArrayGet(pCache->grps, 0); + if (NULL == pGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + ASSERT(pGrp->blk == pCache->outBlk); //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx; } @@ -2320,16 +2455,20 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) { return TSDB_CODE_SUCCESS; } -void mAsofForwardUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) { +int32_t mAsofForwardUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) { int32_t grpNum = taosArrayGetSize(pCtx->cache.grps); if (grpNum <= 0) { - return; + return TSDB_CODE_SUCCESS; } SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0); + if (NULL == pGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (1 == grpNum) { pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows - pGrp->beginIdx, pCtx->jLimit) - 1; - return; + return TSDB_CODE_SUCCESS; } ASSERT(pCtx->jLimit > (pGrp->blk->info.rows - pGrp->beginIdx)); @@ -2338,7 +2477,13 @@ void mAsofForwardUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) { int64_t remainRows = pCtx->jLimit - (pGrp->endIdx - pGrp->beginIdx + 1); pGrp = taosArrayGet(pCtx->cache.grps, 1); + if (NULL == pGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows, remainRows) - 1; + + return TSDB_CODE_SUCCESS; } int32_t mAsofForwardFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) { @@ -2347,13 +2492,16 @@ int32_t mAsofForwardFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) { MJ_ERR_RET(mAsofForwardChkFillGrpCache(pCtx)); } - mAsofForwardUpdateBuildGrpEndIdx(pCtx); + MJ_ERR_RET(mAsofForwardUpdateBuildGrpEndIdx(pCtx)); return mWinJoinDumpGrpCache(pCtx); } int32_t mAsofForwardSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) { SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); + if (NULL == pCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { *wholeBlk = false; @@ -2513,6 +2661,10 @@ static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo if (buildGot) { SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId); SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId); + if (NULL == pProbeCol || NULL == pBuildCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; MJOIN_POP_TB_BLK(&pCtx->cache); @@ -2674,6 +2826,10 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) { } SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGetLast(pGrpArray); + if (NULL == pGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (!pGrp->clonedBlk) { if (0 == pGrp->beginIdx) { pGrp->blk = createOneDataBlock(pGrp->blk, true); @@ -2720,6 +2876,10 @@ static int32_t mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJ if (buildGot && pCtx->forwardRowsAcq) { SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId); SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId); + if (NULL == pProbeCol || NULL == pBuildCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; buildGot = false; @@ -2739,6 +2899,10 @@ static int32_t mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJ int32_t mWinJoinTryAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) { SSDataBlock* pBlk = build->blk; SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId); + if (NULL == pCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (pCtx->ascTs) { if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) < pCtx->winBeginTs) { *winEnd = false; @@ -2759,6 +2923,9 @@ int32_t mWinJoinTryAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) { SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx}; SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); + if (NULL == pGrp) { + MJ_ERR_RET(terrno); + } pGrp->readIdx = pGrp->beginIdx; pGrp->endIdx = pGrp->beginIdx; @@ -2795,7 +2962,10 @@ int32_t mWinJoinTryAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, if (*((int64_t*)pCol->pData + build->blkRowIdx) >= pCtx->winBeginTs) { SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx}; SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); - + if (NULL == pGrp) { + MJ_ERR_RET(terrno); + } + pGrp->readIdx = pGrp->beginIdx; pGrp->endIdx = pGrp->beginIdx; @@ -2864,7 +3034,15 @@ int32_t mWinJoinMoveAscWinBegin(SMJoinWindowCtx* pCtx) { int32_t grpNum = taosArrayGetSize(pCache->grps); for (int32_t i = 0; i < grpNum; ++i) { SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i); + if (NULL == pGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId); + if (NULL == pCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) < pCtx->winBeginTs) { mWinJoinPopFrontGroup(pCtx, pGrp); grpNum--; @@ -2900,6 +3078,7 @@ int32_t mWinJoinMoveAscWinBegin(SMJoinWindowCtx* pCtx) { pCache->grps = pCache->grpsQueue; pCache->rowNum = 1; pCache->grpsQueue = NULL; + continue; } @@ -2916,7 +3095,15 @@ int32_t mWinJoinMoveDescWinBegin(SMJoinWindowCtx* pCtx) { int32_t grpNum = taosArrayGetSize(pCache->grps); for (int32_t i = 0; i < grpNum; ++i) { SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i); + if (NULL == pGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId); + if (NULL == pCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) > pCtx->winEndTs) { mWinJoinPopFrontGroup(pCtx, pGrp); @@ -2953,6 +3140,7 @@ int32_t mWinJoinMoveDescWinBegin(SMJoinWindowCtx* pCtx) { pCache->grps = pCache->grpsQueue; pCache->rowNum = 1; pCache->grpsQueue = NULL; + continue; } @@ -2986,6 +3174,10 @@ void mWinJoinRemoveOverflowGrp(SMJoinWindowCtx* pCtx) { int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) { SSDataBlock* pBlk = build->blk; SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId); + if (NULL == pCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx}; if (pCtx->ascTs) { @@ -3001,6 +3193,9 @@ int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, S if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) <= pCtx->winEndTs) { SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); + if (NULL == pGrp) { + MJ_ERR_RET(terrno); + } pGrp->readIdx = pGrp->beginIdx; pGrp->endIdx = pBlk->info.rows - 1; @@ -3028,6 +3223,9 @@ int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, S } SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); + if (NULL == pGrp) { + MJ_ERR_RET(terrno); + } pGrp->readIdx = pGrp->beginIdx; pGrp->endIdx = build->blkRowIdx - 1; @@ -3048,6 +3246,9 @@ int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, S if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) >= pCtx->winBeginTs) { SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); + if (NULL == pGrp) { + MJ_ERR_RET(terrno); + } pGrp->readIdx = pGrp->beginIdx; pGrp->endIdx = pBlk->info.rows - 1; @@ -3070,7 +3271,10 @@ int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, S } SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); - + if (NULL == pGrp) { + MJ_ERR_RET(terrno); + } + pGrp->readIdx = pGrp->beginIdx; pGrp->endIdx = build->blkRowIdx - 1; @@ -3130,7 +3334,15 @@ int32_t mWinJoinMoveAscWinEnd(SMJoinWindowCtx* pCtx) { } SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps); + if (NULL == pGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId); + if (NULL == pCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) <= pCtx->winEndTs) { pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1; if (pCache->rowNum >= pCtx->jLimit) { @@ -3173,7 +3385,15 @@ int32_t mWinJoinMoveDescWinEnd(SMJoinWindowCtx* pCtx) { } SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps); + if (NULL == pGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId); + if (NULL == pCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) >= pCtx->winBeginTs) { pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1; pGrp->endIdx = pGrp->blk->info.rows - 1; @@ -3217,6 +3437,10 @@ int32_t mWinJoinTrimDumpGrpCache(SMJoinWindowCtx* pCtx) { int32_t buildGrpNum = taosArrayGetSize(cache->grps); for (int32_t i = 0; i < buildGrpNum && skipRows > 0; ++i) { SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, i); + if (NULL == buildGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (skipRows >= GRP_REMAIN_ROWS(buildGrp)) { skipRows -= GRP_REMAIN_ROWS(buildGrp); mWinJoinPopFrontGroup(pCtx, buildGrp); @@ -3318,7 +3542,7 @@ int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, pCache->grps = taosArrayInit(2, sizeof(SMJoinGrpRows)); if (NULL == pCache->grps) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } //taosArrayReserve(pTable->eqGrps, 1); @@ -3392,6 +3616,10 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p } pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + if (NULL == pCtx->finBlk) { + MJ_ERR_RET(terrno); + } + MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode))); pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO; @@ -3430,6 +3658,10 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ } pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + if (NULL == pCtx->finBlk) { + MJ_ERR_RET(terrno); + } + ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0); MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode))); diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 9db2c7d300..8d94841847 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -30,6 +30,9 @@ int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) { SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); + if (NULL == pCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } pGrp->beginIdx = pTable->blkRowIdx; pGrp->readIdx = pTable->blkRowIdx; @@ -59,12 +62,16 @@ int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBl } -void mJoinTrimKeepFirstRow(SSDataBlock* pBlock) { +int32_t mJoinTrimKeepFirstRow(SSDataBlock* pBlock) { int32_t bmLen = BitmapLen(pBlock->info.rows); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); + if (NULL == pDst) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + // it is a reserved column for scalar function, and no data in this column yet. if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) { continue; @@ -86,7 +93,7 @@ void mJoinTrimKeepFirstRow(SSDataBlock* pBlock) { } else { bool isNull = colDataIsNull_f(pDst->nullbitmap, 0); - memset(pDst->nullbitmap, 0, bmLen); + TAOS_MEMSET(pDst->nullbitmap, 0, bmLen); if (isNull) { colDataSetNull_f(pDst->nullbitmap, 0); } @@ -94,11 +101,14 @@ void mJoinTrimKeepFirstRow(SSDataBlock* pBlock) { } pBlock->info.rows = 1; + + return TSDB_CODE_SUCCESS; } -void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) { +int32_t mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) { // int32_t totalRows = pBlock->info.rows; + int32_t code = TSDB_CODE_SUCCESS; int32_t bmLen = BitmapLen(totalRows); char* pBitmap = NULL; int32_t maxRows = 0; @@ -106,6 +116,10 @@ void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBo for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); + if (NULL == pDst) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + // it is a reserved column for scalar function, and no data in this column yet. if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) { continue; @@ -135,8 +149,12 @@ void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBo len = varDataTLen(p1); } char* p2 = taosMemoryMalloc(len); - memcpy(p2, p1, len); - colDataSetVal(pDst, numOfRows, p2, false); + TAOS_MEMCPY(p2, p1, len); + code = colDataSetVal(pDst, numOfRows, p2, false); + if (code) { + taosMemoryFreeClear(p2); + MJ_ERR_RET(terrno); + } taosMemoryFree(p2); } numOfRows += 1; @@ -150,10 +168,13 @@ void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBo } else { if (pBitmap == NULL) { pBitmap = taosMemoryCalloc(1, bmLen); + if (NULL == pBitmap) { + MJ_ERR_RET(terrno); + } } - memcpy(pBitmap, pDst->nullbitmap, bmLen); - memset(pDst->nullbitmap, 0, bmLen); + TAOS_MEMCPY(pBitmap, pDst->nullbitmap, bmLen); + TAOS_MEMSET(pDst->nullbitmap, 0, bmLen); int32_t j = 0; @@ -243,6 +264,8 @@ void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBo if (pBitmap != NULL) { taosMemoryFree(pBitmap); } + + return TSDB_CODE_SUCCESS; } @@ -290,8 +313,10 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo code = TSDB_CODE_SUCCESS; _err: + colDataDestroy(p); taosMemoryFree(p); + return code; } @@ -300,18 +325,19 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM return TSDB_CODE_SUCCESS; } + int32_t code = TSDB_CODE_SUCCESS; SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; SColumnInfoData* p = NULL; - int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); + code = filterSetDataFromSlotId(pFilterInfo, ¶m1); if (code != TSDB_CODE_SUCCESS) { - goto _err; + goto _return; } int32_t status = 0; code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); if (code != TSDB_CODE_SUCCESS) { - goto _err; + goto _return; } int32_t rowNum = 0; @@ -320,6 +346,9 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM if (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED) { for (int32_t i = startGrpIdx; i < grpNum && rowNum < pBlock->info.rows; startRowIdx = 0, ++i) { SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, i); + if (NULL == buildGrp) { + MJ_ERR_JRET(terrno); + } if (buildGrp->allRowsMatch) { rowNum += buildGrp->endIdx - startRowIdx + 1; continue; @@ -350,9 +379,11 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM code = TSDB_CODE_SUCCESS; -_err: +_return: + colDataDestroy(p); taosMemoryFree(p); + return code; } @@ -366,29 +397,31 @@ int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInf int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); if (code != TSDB_CODE_SUCCESS) { - goto _err; + goto _return; } int32_t status = 0; code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); if (code != TSDB_CODE_SUCCESS) { - goto _err; + goto _return; } if (status == FILTER_RESULT_ALL_QUALIFIED) { pBlock->info.rows = 1; - mJoinTrimKeepFirstRow(pBlock); + MJ_ERR_JRET(mJoinTrimKeepFirstRow(pBlock)); } else if (status == FILTER_RESULT_NONE_QUALIFIED) { pBlock->info.rows = 0; } else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) { - mJoinTrimKeepOneRow(pBlock, pBlock->info.rows, (bool*)p->pData); + MJ_ERR_JRET(mJoinTrimKeepOneRow(pBlock, pBlock->info.rows, (bool*)p->pData)); } code = TSDB_CODE_SUCCESS; -_err: +_return: + colDataDestroy(p); taosMemoryFree(p); + return code; } @@ -418,8 +451,10 @@ int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) code = TSDB_CODE_SUCCESS; _err: + colDataDestroy(p); taosMemoryFree(p); + return code; } @@ -480,12 +515,20 @@ int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app SMJoinColMap* pFirstCol = probe->finCols + c; SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot); SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); - colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows); + if (NULL == pInCol || NULL == pOutCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + + MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows)); } for (int32_t c = 0; c < build->finNum; ++c) { SMJoinColMap* pSecondCol = build->finCols + c; SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot); + if (NULL == pOutCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + colDataSetNItemsNull(pOutCol, currRows, firstRows); } @@ -536,6 +579,10 @@ int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app SMJoinColMap* pFirstCol = probe->finCols + c; SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot); SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); + if (NULL == pInCol || NULL == pOutCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + for (int32_t r = 0; r < firstRows; ++r) { if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) { colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows); @@ -543,7 +590,7 @@ int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app ASSERT(pRes->info.capacity >= (pRes->info.rows + firstRows * secondRows)); uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type)) ? pOutCol->varmeta.length : ((currRows + r * secondRows) * pOutCol->info.bytes); ASSERT((startOffset + 1 * pOutCol->info.bytes) <= pRes->info.capacity * pOutCol->info.bytes); - colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r), secondRows, true); + MJ_ERR_RET(colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r), secondRows, true)); } } } @@ -552,28 +599,40 @@ int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app SMJoinColMap* pSecondCol = build->finCols + c; SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot); SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot); + if (NULL == pInCol || NULL == pOutCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + for (int32_t r = 0; r < firstRows; ++r) { - colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows); + MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows)); } } pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows; + return TSDB_CODE_SUCCESS; } -bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build) { +int32_t mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build, bool* cont) { + if (NULL != cont) { + *cont = false; + } + int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity; if (rowsLeft <= 0) { - return false; + return TSDB_CODE_SUCCESS; } int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp); int32_t grpRows = buildGrpRows - build->grpRowIdx; if (grpRows <= 0 || build->grpRowIdx < 0) { build->grpRowIdx = -1; - return true; + if (NULL != cont) { + *cont = true; + } + return TSDB_CODE_SUCCESS; } int32_t actRows = TMIN(grpRows, rowsLeft); @@ -583,10 +642,14 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, S SMJoinColMap* pFirstCol = probe->finCols + c; SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot); SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot); + if (NULL == pInCol || NULL == pOutCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (colDataIsNull_s(pInCol, probeGrp->readIdx)) { colDataSetNItemsNull(pOutCol, currRows, actRows); } else { - colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, true); + MJ_ERR_RET(colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, true)); } } @@ -595,8 +658,16 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, S SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot); for (int32_t r = 0; r < actRows; ++r) { SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, build->grpRowIdx + r); + if (NULL == pRow) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot); - colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1); + if (NULL == pInCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + + MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1)); } } @@ -609,16 +680,24 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, S } if (actRows == rowsLeft) { - return false; + return TSDB_CODE_SUCCESS; } - return true; + if (NULL != cont) { + *cont = true; + } + + return TSDB_CODE_SUCCESS; } int32_t mJoinAllocGrpRowBitmap(SMJoinTableCtx* pTb) { int32_t grpNum = taosArrayGetSize(pTb->eqGrps); for (int32_t i = 0; i < grpNum; ++i) { SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGet(pTb->eqGrps, i); + if (NULL == pGrp) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + MJ_ERR_RET(mJoinGetRowBitmapOffset(pTb, pGrp->endIdx - pGrp->beginIdx + 1, &pGrp->rowBitmapOffset)); pGrp->rowMatchNum = 0; } @@ -632,9 +711,9 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB pCtx->lastEqGrp = true; - mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true); + MJ_ERR_RET(mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true)); if (!lastBuildGrp) { - mJoinRetrieveEqGrpRows(pJoin, pJoin->build, timestamp); + MJ_ERR_RET(mJoinRetrieveEqGrpRows(pJoin, pJoin->build, timestamp)); } else { pJoin->build->grpIdx = 0; } @@ -661,7 +740,7 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB pCtx->hashJoin = false; if (!lastBuildGrp && pJoin->build->rowBitmapSize > 0) { - mJoinAllocGrpRowBitmap(pJoin->build); + MJ_ERR_RET(mJoinAllocGrpRowBitmap(pJoin->build)); } return (*pCtx->mergeCartFp)(pCtx); @@ -721,7 +800,7 @@ int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo*** pDow *newDownstreams = true; *pDownstream = mJoinBuildDownstreams(pInfo, *pDownstream); if (NULL == *pDownstream) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } *numOfDownstream = 2; } @@ -732,7 +811,7 @@ int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo*** pDow static int32_t mJoinInitPrimKeyInfo(SMJoinTableCtx* pTable, int32_t slotId) { pTable->primCol = taosMemoryMalloc(sizeof(SMJoinColInfo)); if (NULL == pTable->primCol) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pTable->primCol->srcSlot = slotId; @@ -745,7 +824,7 @@ static int32_t mJoinInitColsInfo(int32_t* colNum, int64_t* rowSize, SMJoinColInf *pCols = taosMemoryMalloc((*colNum) * sizeof(SMJoinColInfo)); if (NULL == *pCols) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } *rowSize = 0; @@ -779,7 +858,7 @@ static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList, bo pTable->keyBuf = taosMemoryMalloc(TMAX(rowSize, pTable->keyNullSize)); if (NULL == pTable->keyBuf) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } @@ -790,7 +869,7 @@ static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList, bo static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) { pTable->finCols = taosMemoryMalloc(LIST_LENGTH(pList) * sizeof(SMJoinColMap)); if (NULL == pTable->finCols) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t i = 0; @@ -837,8 +916,20 @@ static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoi } SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); - SValueNode* pCurrTz = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2) : NULL; + if (NULL == pUnit) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + SValueNode* pCurrTz = NULL; + if (5 == pFunc->pParameterList->length){ + pCurrTz = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2); + if (NULL == pCurrTz) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + } SValueNode* pTimeZone = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4) : (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3); + if (NULL == pTimeZone) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } pCtx->truncateUnit = pUnit->typeData; if ((NULL == pCurrTz || 1 == pCurrTz->typeData) && pCtx->truncateUnit >= (86400 * TSDB_TICK_PER_SECOND(pFunc->node.resType.precision))) { @@ -859,24 +950,32 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi MJ_ERR_RET(mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight, JOIN_TYPE_FULL == pJoin->joinType)); MJ_ERR_RET(mJoinInitFinColsInfo(pTable, pJoinNode->pTargets)); - memcpy(&pTable->inputStat, pStat, sizeof(*pStat)); + TAOS_MEMCPY(&pTable->inputStat, pStat, sizeof(*pStat)); pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows)); - //taosArrayReserve(pTable->eqGrps, 1); + if (NULL == pTable->eqGrps) { + return terrno; + } if (E_JOIN_TB_BUILD == pTable->type) { pTable->createdBlks = taosArrayInit(8, POINTER_BYTES); + if (NULL == pTable->createdBlks) { + return terrno; + } pTable->pGrpArrays = taosArrayInit(32, POINTER_BYTES); + if (NULL == pTable->pGrpArrays) { + return terrno; + } pTable->pGrpHash = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); - if (NULL == pTable->createdBlks || NULL == pTable->pGrpArrays || NULL == pTable->pGrpHash) { - return TSDB_CODE_OUT_OF_MEMORY; + if (NULL == pTable->pGrpHash) { + return terrno; } if (pJoin->pFPreFilter && IS_FULL_OUTER_JOIN(pJoin->joinType, pJoin->subType)) { pTable->rowBitmapSize = MJOIN_ROW_BITMAP_SIZE; pTable->pRowBitmap = taosMemoryMalloc(pTable->rowBitmapSize); if (NULL == pTable->pRowBitmap) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } @@ -945,7 +1044,15 @@ int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) { SMJoinPrimExprCtx* pCtx = &pTable->primCtx; SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot); + if (NULL == pPrimIn) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId); + if (NULL == pPrimOut) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + if (0 != pCtx->timezoneUnit) { for (int32_t i = 0; i < pBlock->info.rows; ++i) { ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit; @@ -961,6 +1068,7 @@ int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) { SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) { SSDataBlock* pTmp = NULL; + int32_t code = TSDB_CODE_SUCCESS; int32_t dsIdx = pTable->downStreamIdx; if (E_JOIN_TB_PROBE == pTable->type) { if (pTable->remainInBlk) { @@ -1028,7 +1136,12 @@ SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTa _return: - mJoinLaunchPrimExpr(pTmp, pTable); + code = mJoinLaunchPrimExpr(pTmp, pTable); + if (code) { + pJoin->errCode = code; + T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode); + } + return pTmp; } @@ -1041,7 +1154,11 @@ static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SM if (NULL == pTmp) { pTable->dsFetchDone = true; } else { - mJoinLaunchPrimExpr(pTmp, pTable); + int32_t code = mJoinLaunchPrimExpr(pTmp, pTable); + if (code) { + pJoin->errCode = code; + T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode); + } } return pTmp; @@ -1107,7 +1224,7 @@ bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** pp static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) { int32_t blkNum = taosArrayGetSize(pCreatedBlks); for (int32_t i = 0; i < blkNum; ++i) { - blockDataDestroy(*(SSDataBlock**)TARRAY_GET_ELEM(pCreatedBlks, i)); + (void)blockDataDestroy(*(SSDataBlock**)TARRAY_GET_ELEM(pCreatedBlks, i)); } taosArrayClear(pCreatedBlks); } @@ -1119,12 +1236,12 @@ int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t int64_t newSize = reqSize * 1.1; pTable->pRowBitmap = taosMemoryRealloc(pTable->pRowBitmap, newSize); if (NULL == pTable->pRowBitmap) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pTable->rowBitmapSize = newSize; } - memset(pTable->pRowBitmap + pTable->rowBitmapOffset, 0xFFFFFFFF, bitmapLen); + TAOS_MEMSET(pTable->pRowBitmap + pTable->rowBitmapOffset, 0xFFFFFFFF, bitmapLen); *rowBitmapOffset = pTable->rowBitmapOffset; pTable->rowBitmapOffset += bitmapLen; @@ -1140,12 +1257,16 @@ void mJoinResetForBuildTable(SMJoinTableCtx* pTable) { taosArrayClear(pTable->eqGrps); if (pTable->rowBitmapSize > 0) { pTable->rowBitmapOffset = 1; - memset(&pTable->nMatchCtx, 0, sizeof(pTable->nMatchCtx)); + TAOS_MEMSET(&pTable->nMatchCtx, 0, sizeof(pTable->nMatchCtx)); } } int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) { SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); + if (NULL == pCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + SMJoinGrpRows* pGrp = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -1159,7 +1280,10 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol bool keepGrp = true; pGrp = taosArrayReserve(pTable->eqGrps, 1); - + if (NULL == pGrp) { + MJ_ERR_RET(terrno); + } + pGrp->beginIdx = pTable->blkRowIdx++; pGrp->readIdx = pGrp->beginIdx; pGrp->endIdx = pGrp->beginIdx; @@ -1209,10 +1333,10 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) { pGrp->blk = createOneDataBlock(pTable->blk, true); if (NULL == pGrp->blk) { - MJ_ERR_JRET(terrno); + MJ_ERR_RET(terrno); } if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) { - MJ_ERR_JRET(terrno); + MJ_ERR_RET(terrno); } } else { if (!pTable->multiEqGrpRows) { @@ -1236,10 +1360,16 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol pTable->eqRowNum += rowNum; pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum); + if (NULL == pGrp->blk) { + MJ_ERR_RET(terrno); + } + pGrp->endIdx -= pGrp->beginIdx; pGrp->beginIdx = 0; pGrp->readIdx = 0; - taosArrayPush(pTable->createdBlks, &pGrp->blk); + if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) { + MJ_ERR_RET(terrno); + } } } @@ -1248,19 +1378,19 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol _return: if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) { - taosArrayPop(pTable->eqGrps); + (void)taosArrayPop(pTable->eqGrps); } else { pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1; } - return TSDB_CODE_SUCCESS; + return code; } int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable, int64_t timestamp) { bool wholeBlk = false; - mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true); + MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true)); while (wholeBlk && !pTable->dsFetchDone) { pTable->blk = (*pJoin->retrieveFp)(pJoin, pTable); @@ -1273,7 +1403,7 @@ int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable } wholeBlk = false; - mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, false); + MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, false)); } return TSDB_CODE_SUCCESS; @@ -1282,6 +1412,10 @@ int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable) { for (int32_t i = 0; i < pTable->keyNum; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); + if (NULL == pCol) { + MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } + if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata); return TSDB_CODE_INVALID_PARA; @@ -1326,15 +1460,15 @@ bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *p } if (pTable->keyCols[0].jsonData) { pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx]; - memcpy(pTable->keyBuf + bufLen, pData, getJsonValueLen(pData)); + TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, getJsonValueLen(pData)); bufLen += getJsonValueLen(pData); } else if (pTable->keyCols[i].vardata) { pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx]; - memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); + TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); bufLen += varDataTLen(pData); } else { pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx; - memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes); + TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes); bufLen += pTable->keyCols[i].bytes; } } @@ -1352,15 +1486,20 @@ static int32_t mJoinGetAvailableGrpArray(SMJoinTableCtx* pTable, SArray** ppRes) do { if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) { *ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++); + if (NULL == *ppRes) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } taosArrayClear(*ppRes); return TSDB_CODE_SUCCESS; } SArray* pNew = taosArrayInit(4, sizeof(SMJoinRowPos)); if (NULL == pNew) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + if (NULL == taosArrayPush(pTable->pGrpArrays, &pNew)) { + return terrno; } - taosArrayPush(pTable->pGrpArrays, &pNew); } while (true); return TSDB_CODE_SUCCESS; @@ -1374,10 +1513,14 @@ static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDat SArray* pNewGrp = NULL; MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp)); - taosArrayPush(pNewGrp, &pos); - tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES); + if (NULL == taosArrayPush(pNewGrp, &pos)) { + return terrno; + } + MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES)); } else if (pBuild->multiRowsGrp) { - taosArrayPush(*pGrpRows, &pos); + if (NULL == taosArrayPush(*pGrpRows, &pos)) { + return terrno; + } } return TSDB_CODE_SUCCESS; @@ -1392,10 +1535,14 @@ static int32_t mJoinAddRowToFullHash(SMJoinOperatorInfo* pJoin, size_t keyLen, S SMJoinHashGrpRows pNewGrp = {0}; MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp.pRows)); - taosArrayPush(pNewGrp.pRows, &pos); - tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, sizeof(pNewGrp)); + if (NULL == taosArrayPush(pNewGrp.pRows, &pos)) { + return terrno; + } + MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, sizeof(pNewGrp))); } else { - taosArrayPush(pGrpRows->pRows, &pos); + if (NULL == taosArrayPush(pGrpRows->pRows, &pos)) { + return terrno; + } } return TSDB_CODE_SUCCESS; @@ -1412,6 +1559,9 @@ int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pT int32_t grpNum = taosArrayGetSize(pTable->eqGrps); for (int32_t g = 0; g < grpNum; ++g) { SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g); + if (NULL == pGrp) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable)); int32_t grpRows = GRP_REMAIN_ROWS(pGrp); @@ -1440,6 +1590,10 @@ int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable int32_t grpNum = taosArrayGetSize(pTable->eqGrps); for (int32_t g = 0; g < grpNum; ++g) { SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g); + if (NULL == pGrp) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable)); int32_t grpRows = GRP_REMAIN_ROWS(pGrp); @@ -1496,8 +1650,11 @@ void mWinJoinResetWindowCache(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache) { for (int32_t i = 0; i < grpNum; ++i) { SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i); + if (NULL == pGrp) { + continue; + } if (pGrp->blk != pCtx->cache.outBlk && pGrp->clonedBlk) { - blockDataDestroy(pGrp->blk); + (void)blockDataDestroy(pGrp->blk); } } @@ -1539,6 +1696,7 @@ void mJoinResetOperator(struct SOperatorInfo* pOperator) { SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; + int32_t code = TSDB_CODE_SUCCESS; if (pOperator->status == OP_EXEC_DONE) { if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) { qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo)); @@ -1567,7 +1725,11 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { pBlock->info.id.blockId = pJoin->outBlkId; if (pJoin->pFinFilter != NULL) { - doFilter(pBlock, pJoin->pFinFilter, NULL); + code = doFilter(pBlock, pJoin->pFinFilter, NULL); + if (code) { + pJoin->errCode = code; + T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode); + } } if (pBlock->info.rows > 0 || pOperator->status == OP_EXEC_DONE) { @@ -1703,13 +1865,17 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) { SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { - SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); bool newDownstreams = false; - int32_t code = TSDB_CODE_SUCCESS; - if (pOperator == NULL || pInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo)); + if (pInfo == NULL) { + code = terrno; + goto _return; + } + + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + code = terrno; goto _return; } @@ -1722,8 +1888,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t MJ_ERR_JRET(mJoinHandleConds(pInfo, pJoinNode)); - mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0], newDownstreams); - mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1], newDownstreams); + MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0], newDownstreams)); + MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1], newDownstreams)); MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode)); MJ_ERR_JRET(mJoinSetImplFp(pInfo)); @@ -1742,6 +1908,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t return pOperator; _return: + if (pInfo != NULL) { destroyMergeJoinOperator(pInfo); } @@ -1751,6 +1918,7 @@ _return: taosMemoryFree(pOperator); pTaskInfo->code = code; + return NULL; }