diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 5e5d6494cf..fa9dc79cc3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1230,7 +1230,7 @@ void destroyIntervalOperatorInfo(void* param) { } static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo, - SExecTaskInfo* pTaskInfo) { + bool* pRes) { // the primary timestamp column bool needed = false; int32_t code = TSDB_CODE_SUCCESS; @@ -1298,10 +1298,9 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - return needed; + *pRes = needed; + return code; } int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, @@ -1390,7 +1389,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); QUERY_CHECK_CODE(code, lino, _error); - pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, pTaskInfo); + + pInfo->timeWindowInterpo = false; + code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo); + QUERY_CHECK_CODE(code, lino, _error); if (pInfo->timeWindowInterpo) { pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); if (pInfo->binfo.resultRowInfo.openWindow == NULL) { @@ -2085,7 +2087,9 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); QUERY_CHECK_CODE(code, lino, _error); - iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, pTaskInfo); + iaInfo->timeWindowInterpo = false; + code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo); + QUERY_CHECK_CODE(code, lino, _error); if (iaInfo->timeWindowInterpo) { iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); } @@ -2416,7 +2420,9 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); QUERY_CHECK_CODE(code, lino, _error); - pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, pTaskInfo); + pIntervalInfo->timeWindowInterpo = false; + code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo); + QUERY_CHECK_CODE(code, lino, _error); if (pIntervalInfo->timeWindowInterpo) { pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {