From c723452f83458010969ba5b734358f9d791f6ca5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Oct 2024 11:00:18 +0800 Subject: [PATCH] fix: func return value --- .../libs/executor/src/anomalywindowoperator.c | 35 ++++++++++++------- source/libs/executor/src/forecastoperator.c | 8 +++-- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 7267bbbe09..25d3dbd423 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -420,7 +420,7 @@ _OVER: return code; } -static void anomalyAggregateRows(SOperatorInfo* pOperator, SSDataBlock* pBlock) { +static int32_t anomalyAggregateRows(SOperatorInfo* pOperator, SSDataBlock* pBlock) { SAnomalyWindowOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExprSupp* pExprSup = &pOperator->exprSupp; @@ -429,14 +429,17 @@ static void anomalyAggregateRows(SOperatorInfo* pOperator, SSDataBlock* pBlock) SResultRow* pResRow = pSupp->pResultRow; int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - if (setResultRowInitCtx(pResRow, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset) == 0) { + int32_t code = setResultRowInitCtx(pResRow, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); + if (code == 0) { updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pSupp->curWin, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, + pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } + + return code; } -static void anomalyBuildResult(SOperatorInfo* pOperator) { +static int32_t anomalyBuildResult(SOperatorInfo* pOperator) { SAnomalyWindowOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExprSupp* pExprSup = &pOperator->exprSupp; @@ -444,10 +447,14 @@ static void anomalyBuildResult(SOperatorInfo* pOperator) { SResultRow* pResRow = pInfo->anomalySup.pResultRow; doUpdateNumOfRows(pExprSup->pCtx, pResRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); - copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResRow, pExprSup->pCtx, pRes, - pExprSup->rowEntryInfoOffset, pTaskInfo); - pRes->info.rows += pResRow->numOfRows; + int32_t code = copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResRow, pExprSup->pCtx, pRes, + pExprSup->rowEntryInfoOffset, pTaskInfo); + if (code == 0) { + pRes->info.rows += pResRow->numOfRows; + } + clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); + return code; } static void anomalyAggregateBlocks(SOperatorInfo* pOperator) { @@ -540,13 +547,15 @@ static void anomalyAggregateBlocks(SOperatorInfo* pOperator) { if (rowsInBlock > 0) { qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg", pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); - anomalyAggregateRows(pOperator, pBlock); + code = anomalyAggregateRows(pOperator, pBlock); + QUERY_CHECK_CODE(code, lino, _OVER); rowsInBlock = 0; } if (rowsInWin > 0) { qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, build result", pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); - anomalyBuildResult(pOperator); + code = anomalyBuildResult(pOperator); + QUERY_CHECK_CODE(code, lino, _OVER); rowsInWin = 0; } if (anomalyFindWindow(pSupp, tsList[r]) == 0) { @@ -567,7 +576,8 @@ static void anomalyAggregateBlocks(SOperatorInfo* pOperator) { if (lastRow && rowsInBlock > 0) { qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg since lastrow", pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); - anomalyAggregateRows(pOperator, pBlock); + code = anomalyAggregateRows(pOperator, pBlock); + QUERY_CHECK_CODE(code, lino, _OVER); rowsInBlock = 0; } } @@ -575,7 +585,8 @@ static void anomalyAggregateBlocks(SOperatorInfo* pOperator) { if (lastBlock && rowsInWin > 0) { qTrace("group:%" PRId64 ", block:%d win:%d, riwin:%d riblock:%d, build result since lastblock", pSupp->groupId, b, pSupp->curWinIndex, rowsInWin, rowsInBlock); - anomalyBuildResult(pOperator); + code = anomalyBuildResult(pOperator); + QUERY_CHECK_CODE(code, lino, _OVER); rowsInWin = 0; } } diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c index 599678106c..0c800eeab5 100644 --- a/source/libs/executor/src/forecastoperator.c +++ b/source/libs/executor/src/forecastoperator.c @@ -351,7 +351,8 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { QUERY_CHECK_CODE(code, lino, _end); } else { qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks); - forecastAggregateBlocks(pSupp, pResBlock); + code = forecastAggregateBlocks(pSupp, pResBlock); + QUERY_CHECK_CODE(code, lino, _end); pSupp->groupId = pBlock->info.id.groupId; numOfBlocks = 1; qDebug("group:%" PRId64 ", new group, cache block rows:%" PRId64, pSupp->groupId, pBlock->info.rows); @@ -368,7 +369,8 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { if (numOfBlocks > 0) { qDebug("group:%" PRId64 ", read finish, blocks:%d", pSupp->groupId, numOfBlocks); - forecastAggregateBlocks(pSupp, pResBlock); + code = forecastAggregateBlocks(pSupp, pResBlock); + QUERY_CHECK_CODE(code, lino, _end); } int64_t cost = taosGetTimestampUs() - st; @@ -556,7 +558,7 @@ static int32_t forecastCreateBuf(SForecastSupp* pSupp) { _OVER: if (code != 0) { - taosAnalBufClose(pBuf); + (void)taosAnalBufClose(pBuf); taosAnalBufDestroy(pBuf); } return code;