fix: func return value

This commit is contained in:
Shengliang Guan 2024-10-15 11:00:18 +08:00
parent 3003e7286e
commit c723452f83
2 changed files with 28 additions and 15 deletions

View File

@ -420,7 +420,7 @@ _OVER:
return code; return code;
} }
static void anomalyAggregateRows(SOperatorInfo* pOperator, SSDataBlock* pBlock) { static int32_t anomalyAggregateRows(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SAnomalyWindowOperatorInfo* pInfo = pOperator->info; SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pExprSup = &pOperator->exprSupp; SExprSupp* pExprSup = &pOperator->exprSupp;
@ -429,14 +429,17 @@ static void anomalyAggregateRows(SOperatorInfo* pOperator, SSDataBlock* pBlock)
SResultRow* pResRow = pSupp->pResultRow; SResultRow* pResRow = pSupp->pResultRow;
int32_t numOfOutput = pOperator->exprSupp.numOfExprs; int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
if (setResultRowInitCtx(pResRow, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset) == 0) { int32_t code = setResultRowInitCtx(pResRow, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
if (code == 0) {
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pSupp->curWin, 0); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pSupp->curWin, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput); pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
} }
return code;
} }
static void anomalyBuildResult(SOperatorInfo* pOperator) { static int32_t anomalyBuildResult(SOperatorInfo* pOperator) {
SAnomalyWindowOperatorInfo* pInfo = pOperator->info; SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pExprSup = &pOperator->exprSupp; SExprSupp* pExprSup = &pOperator->exprSupp;
@ -444,10 +447,14 @@ static void anomalyBuildResult(SOperatorInfo* pOperator) {
SResultRow* pResRow = pInfo->anomalySup.pResultRow; SResultRow* pResRow = pInfo->anomalySup.pResultRow;
doUpdateNumOfRows(pExprSup->pCtx, pResRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); doUpdateNumOfRows(pExprSup->pCtx, pResRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResRow, pExprSup->pCtx, pRes, int32_t code = copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResRow, pExprSup->pCtx, pRes,
pExprSup->rowEntryInfoOffset, pTaskInfo); pExprSup->rowEntryInfoOffset, pTaskInfo);
if (code == 0) {
pRes->info.rows += pResRow->numOfRows; pRes->info.rows += pResRow->numOfRows;
}
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
return code;
} }
static void anomalyAggregateBlocks(SOperatorInfo* pOperator) { static void anomalyAggregateBlocks(SOperatorInfo* pOperator) {
@ -540,13 +547,15 @@ static void anomalyAggregateBlocks(SOperatorInfo* pOperator) {
if (rowsInBlock > 0) { if (rowsInBlock > 0) {
qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg", pSupp->groupId, qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg", pSupp->groupId,
b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
anomalyAggregateRows(pOperator, pBlock); code = anomalyAggregateRows(pOperator, pBlock);
QUERY_CHECK_CODE(code, lino, _OVER);
rowsInBlock = 0; rowsInBlock = 0;
} }
if (rowsInWin > 0) { if (rowsInWin > 0) {
qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, build result", qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, build result",
pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
anomalyBuildResult(pOperator); code = anomalyBuildResult(pOperator);
QUERY_CHECK_CODE(code, lino, _OVER);
rowsInWin = 0; rowsInWin = 0;
} }
if (anomalyFindWindow(pSupp, tsList[r]) == 0) { if (anomalyFindWindow(pSupp, tsList[r]) == 0) {
@ -567,7 +576,8 @@ static void anomalyAggregateBlocks(SOperatorInfo* pOperator) {
if (lastRow && rowsInBlock > 0) { if (lastRow && rowsInBlock > 0) {
qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg since lastrow", qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg since lastrow",
pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock); pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
anomalyAggregateRows(pOperator, pBlock); code = anomalyAggregateRows(pOperator, pBlock);
QUERY_CHECK_CODE(code, lino, _OVER);
rowsInBlock = 0; rowsInBlock = 0;
} }
} }
@ -575,7 +585,8 @@ static void anomalyAggregateBlocks(SOperatorInfo* pOperator) {
if (lastBlock && rowsInWin > 0) { if (lastBlock && rowsInWin > 0) {
qTrace("group:%" PRId64 ", block:%d win:%d, riwin:%d riblock:%d, build result since lastblock", pSupp->groupId, b, qTrace("group:%" PRId64 ", block:%d win:%d, riwin:%d riblock:%d, build result since lastblock", pSupp->groupId, b,
pSupp->curWinIndex, rowsInWin, rowsInBlock); pSupp->curWinIndex, rowsInWin, rowsInBlock);
anomalyBuildResult(pOperator); code = anomalyBuildResult(pOperator);
QUERY_CHECK_CODE(code, lino, _OVER);
rowsInWin = 0; rowsInWin = 0;
} }
} }

View File

@ -351,7 +351,8 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} else { } else {
qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks); qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks);
forecastAggregateBlocks(pSupp, pResBlock); code = forecastAggregateBlocks(pSupp, pResBlock);
QUERY_CHECK_CODE(code, lino, _end);
pSupp->groupId = pBlock->info.id.groupId; pSupp->groupId = pBlock->info.id.groupId;
numOfBlocks = 1; numOfBlocks = 1;
qDebug("group:%" PRId64 ", new group, cache block rows:%" PRId64, pSupp->groupId, pBlock->info.rows); qDebug("group:%" PRId64 ", new group, cache block rows:%" PRId64, pSupp->groupId, pBlock->info.rows);
@ -368,7 +369,8 @@ static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (numOfBlocks > 0) { if (numOfBlocks > 0) {
qDebug("group:%" PRId64 ", read finish, blocks:%d", pSupp->groupId, numOfBlocks); qDebug("group:%" PRId64 ", read finish, blocks:%d", pSupp->groupId, numOfBlocks);
forecastAggregateBlocks(pSupp, pResBlock); code = forecastAggregateBlocks(pSupp, pResBlock);
QUERY_CHECK_CODE(code, lino, _end);
} }
int64_t cost = taosGetTimestampUs() - st; int64_t cost = taosGetTimestampUs() - st;
@ -556,7 +558,7 @@ static int32_t forecastCreateBuf(SForecastSupp* pSupp) {
_OVER: _OVER:
if (code != 0) { if (code != 0) {
taosAnalBufClose(pBuf); (void)taosAnalBufClose(pBuf);
taosAnalBufDestroy(pBuf); taosAnalBufDestroy(pBuf);
} }
return code; return code;