diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 91583d03ed..95d415f85c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -490,7 +490,7 @@ static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDa } if (pBlock == NULL) { - code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + code = TSDB_CODE_INVALID_PARA; return code; } @@ -782,11 +782,14 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, ret); } // window start key interpolation - doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); + ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1); @@ -814,7 +817,10 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->binfo.inputTsOrder); // window start(end) key interpolation - doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); + code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } // TODO: add to open window? how to close the open windows after input blocks exhausted? #if 0 if ((ascScan && ekey <= pBlock->info.window.ekey) || @@ -2253,11 +2259,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, ret); } // window start key interpolation - doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup); + ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, ret); + } } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1); @@ -2294,7 +2303,10 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* iaInfo->binfo.inputTsOrder); // window start(end) key interpolation - doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); + code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1); applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,