diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 48adb22927..4a2d6bfcd3 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -844,8 +844,8 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo); bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, - int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); +int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, + int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart); void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs, @@ -916,8 +916,8 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag); SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs); void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); -void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, - SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo); +int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, + SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo); void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset); void doClearBufferedBlocks(SStreamScanInfo* pInfo); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 4605d19464..f1c9f1d32b 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -613,8 +613,9 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo return TSDB_CODE_SUCCESS; } -void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, - int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { +int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, + int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { + int32_t code = TSDB_CODE_SUCCESS; for (int32_t k = 0; k < numOfOutput; ++k) { // keep it temporarily SFunctionCtxStatus status = {0}; @@ -641,15 +642,14 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC SScalarParam out = {.columnData = &idata}; SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; - int32_t code = pCtx[k].sfp.process(&tw, 1, &out); + code = pCtx[k].sfp.process(&tw, 1, &out); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); taskInfo->code = code; - T_LONG_JMP(taskInfo->env, code); + return code; } pEntryInfo->numOfRes = 1; } else { - int32_t code = TSDB_CODE_SUCCESS; if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { if ((&pCtx[k])->input.pData[0] == NULL) { code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; @@ -664,7 +664,7 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC } qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); taskInfo->code = code; - T_LONG_JMP(taskInfo->env, code); + return code; } } @@ -672,6 +672,7 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC functionCtxRestore(&pCtx[k], &status); } } + return code; } void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 2233d58ef8..542a7c89a9 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -149,8 +149,9 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { pInfo->pRow->win.ekey = tsCols[num + i - 1]; updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->pRow->win, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, i, num, - pBlock->info.rows, pExprSup->numOfExprs); + code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, i, num, + pBlock->info.rows, pExprSup->numOfExprs); + QUERY_CHECK_CODE(code, lino, _end); if (pInfo->windowCount != pInfo->windowSliding) { if (prevRows <= pInfo->windowSliding) { if (pBuffInfo->winRows > pInfo->windowSliding) { @@ -164,8 +165,9 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } if (pBuffInfo->winRows == pInfo->windowCount) { doUpdateNumOfRows(pExprSup->pCtx, pInfo->pRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); - copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes, - pExprSup->rowEntryInfoOffset, pTaskInfo); + code = copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes, + pExprSup->rowEntryInfoOffset, pTaskInfo); + QUERY_CHECK_CODE(code, lino, _end); pRes->info.rows += pInfo->pRow->numOfRows; clearWinStateBuff(pBuffInfo); pInfo->preStateIndex = pInfo->countSup.curStateIndex; @@ -205,8 +207,9 @@ static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, S continue; } doUpdateNumOfRows(pExprSup->pCtx, pResultRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); - copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResultRow, pExprSup->pCtx, pBlock, - pExprSup->rowEntryInfoOffset, pTaskInfo); + code = copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResultRow, pExprSup->pCtx, pBlock, + pExprSup->rowEntryInfoOffset, pTaskInfo); + QUERY_CHECK_CODE(code, lino, _end); pBlock->info.rows += pResultRow->numOfRows; clearWinStateBuff(pBuff); clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 0f3a08c14b..e164e07252 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -260,8 +260,10 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi return setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); } -static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex, int32_t endIndex, - const SSDataBlock* pBlock, int64_t* tsList, SExecTaskInfo* pTaskInfo) { +static int32_t doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex, + int32_t endIndex, const SSDataBlock* pBlock, int64_t* tsList, + SExecTaskInfo* pTaskInfo) { + int32_t code = TSDB_CODE_SUCCESS; SWindowRowsSup* pRowSup = &pInfo->winSup; int32_t numOfOutput = pSup->numOfExprs; @@ -269,15 +271,16 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu doKeepTuple(pRowSup, tsList[endIndex], pBlock->info.id.groupId); - int32_t ret = - setSingleOutputTupleBufv1(&pInfo->binfo.resultRowInfo, &pRowSup->win, &pInfo->pRow, pSup, &pInfo->aggSup); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); + code = setSingleOutputTupleBufv1(&pInfo->binfo.resultRowInfo, &pRowSup->win, &pInfo->pRow, pSup, &pInfo->aggSup); + if (code != TSDB_CODE_SUCCESS) { // null data, too many state code + qError("failed to set single output tuple buffer, code:%d", code); + return code; } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows, - pBlock->info.rows, numOfOutput); + code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows, + pBlock->info.rows, numOfOutput); + return code; } int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { @@ -287,11 +290,11 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p SExprSupp* pSup = &pOperator->exprSupp; SSDataBlock* pRes = pInfo->binfo.pRes; int64_t gid = pBlock->info.id.groupId; + SColumnInfoData *ps = NULL, *pe = NULL; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); QUERY_CHECK_NULL(pColInfoData, code, lino, _return, terrno); TSKEY* tsList = (TSKEY*)pColInfoData->pData; SWindowRowsSup* pRowSup = &pInfo->winSup; - SColumnInfoData *ps = NULL, *pe = NULL; int32_t rowIndex = 0; pRowSup->numOfRows = 0; @@ -333,7 +336,8 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p } if (rowIndex < pBlock->info.rows) { - doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo); + code = doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo); + QUERY_CHECK_CODE(code, lino, _return); doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset); // check buffer size @@ -343,8 +347,9 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p QUERY_CHECK_CODE(code, lino, _return); } - copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes, - pSup->rowEntryInfoOffset, pTaskInfo); + code = copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes, + pSup->rowEntryInfoOffset, pTaskInfo); + QUERY_CHECK_CODE(code, lino, _return); pRes->info.rows += pInfo->pRow->numOfRows; pInfo->pRow->numOfRows = 0; @@ -352,7 +357,8 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p pInfo->inWindow = false; rowIndex += 1; } else { - doEventWindowAggImpl(pInfo, pSup, startIndex, pBlock->info.rows - 1, pBlock, tsList, pTaskInfo); + code = doEventWindowAggImpl(pInfo, pSup, startIndex, pBlock->info.rows - 1, pBlock, tsList, pTaskInfo); + QUERY_CHECK_CODE(code, lino, _return); } } else { // find the first start value that is fulfill for the start condition for (; rowIndex < pBlock->info.rows; ++rowIndex) { diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index b5c05c8558..07224961f9 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -666,8 +666,8 @@ void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExpr } } -void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, - SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) { +int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, + SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; for (int32_t j = 0; j < numOfExprs; ++j) { @@ -690,7 +690,7 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR int32_t winCode = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(winCode)) { qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(winCode)); - T_LONG_JMP(pTaskInfo->env, winCode); + QUERY_CHECK_CODE(winCode, lino, _end); } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { // do nothing @@ -710,8 +710,8 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); } + return code; } // todo refactor. SResultRow has direct pointer in miainfo @@ -747,7 +747,12 @@ void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPositi T_LONG_JMP(pTaskInfo->env, code); } - copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); + code = copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); + if (TAOS_FAILED(code)) { + releaseBufPage(pBuf, page); + qError("%s copy result row to datablock failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); + T_LONG_JMP(pTaskInfo->env, code); + } releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; @@ -818,9 +823,9 @@ void doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SEx pGroupResInfo->iter = iter; pGroupResInfo->dataPos = pData; - copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); - + code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); releaseBufPage(pBuf, page); + QUERY_CHECK_CODE(code, lino, _end); pBlock->info.rows += pRow->numOfRows; if (pBlock->info.rows >= threshold) { break; @@ -892,9 +897,10 @@ void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp } pGroupResInfo->index += 1; - copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); - + code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); releaseBufPage(pBuf, page); + QUERY_CHECK_CODE(code, lino, _end); + pBlock->info.rows += pRow->numOfRows; if (pBlock->info.rows >= threshold) { break; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 83a579615c..c862a44461 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -329,8 +329,11 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = j - num; - applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, - pOperator->exprSupp.numOfExprs); + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, + pOperator->exprSupp.numOfExprs); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); @@ -347,8 +350,11 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = pBlock->info.rows - num; - applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, - pOperator->exprSupp.numOfExprs); + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, + pOperator->exprSupp.numOfExprs); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); } } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 09bf73c1ee..fb8d8d654e 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1172,8 +1172,9 @@ static int32_t doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, - pSDataBlock->info.rows, numOfOutput); + code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, + forwardRows, pSDataBlock->info.rows, numOfOutput); + QUERY_CHECK_CODE(code, lino, _end); key.ts = nextWin.skey; if (pInfo->delKey.ts > key.ts) { @@ -2442,7 +2443,7 @@ int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* QUERY_CHECK_CODE(code, lino, _end); updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, winDelta); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput); + code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput); _end: if (code != TSDB_CODE_SUCCESS) { @@ -5474,8 +5475,9 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, - pSDataBlock->info.rows, numOfOutput); + code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, + forwardRows, pSDataBlock->info.rows, numOfOutput); + QUERY_CHECK_CODE(code, lino, _end); key.ts = nextWin.skey; if (pInfo->delKey.ts > key.ts) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0cd506d15a..fa914d3ee8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -653,8 +653,11 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, - pBlock->info.rows, numOfExprs); + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, + pBlock->info.rows, numOfExprs); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { closeResultRow(pr); @@ -799,8 +802,11 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -838,8 +844,11 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } #endif updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, - pBlock->info.rows, numOfOutput); + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + pBlock->info.rows, numOfOutput); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } doCloseWindow(pResultRowInfo, pInfo, pResult); } @@ -1031,8 +1040,11 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, + pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } // here we start a new session window doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); @@ -1056,8 +1068,11 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } } static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) { @@ -1492,8 +1507,11 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // pInfo->numOfRows data belong to the current session window updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } } // here we start a new session window @@ -1511,8 +1529,11 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } } static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { @@ -1874,8 +1895,11 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, - currPos - startPos, pBlock->info.rows, pSup->numOfExprs); + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, + currPos - startPos, pBlock->info.rows, pSup->numOfExprs); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow)); @@ -1894,8 +1918,11 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - pBlock->info.rows, pSup->numOfExprs); + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, + currPos - startPos, pBlock->info.rows, pSup->numOfExprs); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } } static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) { @@ -2223,8 +2250,11 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1); - applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, + ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } doCloseWindow(pResultRowInfo, iaInfo, pResult); // output previous interval results after this interval (&win) is closed @@ -2262,8 +2292,11 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1); - applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, - pBlock->info.rows, numOfOutput); + code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, + forwardRows, pBlock->info.rows, numOfOutput); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } doCloseWindow(pResultRowInfo, iaInfo, pResult); // output previous interval results after this interval (&nextWin) is closed