fix:[TD-32262] Avoid mem leak when error occurs.

This commit is contained in:
Jing Sima 2024-09-23 11:19:10 +08:00
parent 5639fd0baf
commit 23b525458e
8 changed files with 124 additions and 67 deletions

View File

@ -844,8 +844,8 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
@ -916,8 +916,8 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
void doClearBufferedBlocks(SStreamScanInfo* pInfo);

View File

@ -613,8 +613,9 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo
return TSDB_CODE_SUCCESS;
}
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t k = 0; k < numOfOutput; ++k) {
// keep it temporarily
SFunctionCtxStatus status = {0};
@ -641,15 +642,14 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC
SScalarParam out = {.columnData = &idata};
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
int32_t code = pCtx[k].sfp.process(&tw, 1, &out);
code = pCtx[k].sfp.process(&tw, 1, &out);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
taskInfo->code = code;
T_LONG_JMP(taskInfo->env, code);
return code;
}
pEntryInfo->numOfRes = 1;
} else {
int32_t code = TSDB_CODE_SUCCESS;
if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
if ((&pCtx[k])->input.pData[0] == NULL) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
@ -664,7 +664,7 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC
}
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
taskInfo->code = code;
T_LONG_JMP(taskInfo->env, code);
return code;
}
}
@ -672,6 +672,7 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC
functionCtxRestore(&pCtx[k], &status);
}
}
return code;
}
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {

View File

@ -149,8 +149,9 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
pInfo->pRow->win.ekey = tsCols[num + i - 1];
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->pRow->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, i, num,
pBlock->info.rows, pExprSup->numOfExprs);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, i, num,
pBlock->info.rows, pExprSup->numOfExprs);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->windowCount != pInfo->windowSliding) {
if (prevRows <= pInfo->windowSliding) {
if (pBuffInfo->winRows > pInfo->windowSliding) {
@ -164,8 +165,9 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
if (pBuffInfo->winRows == pInfo->windowCount) {
doUpdateNumOfRows(pExprSup->pCtx, pInfo->pRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes,
pExprSup->rowEntryInfoOffset, pTaskInfo);
code = copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes,
pExprSup->rowEntryInfoOffset, pTaskInfo);
QUERY_CHECK_CODE(code, lino, _end);
pRes->info.rows += pInfo->pRow->numOfRows;
clearWinStateBuff(pBuffInfo);
pInfo->preStateIndex = pInfo->countSup.curStateIndex;
@ -205,8 +207,9 @@ static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, S
continue;
}
doUpdateNumOfRows(pExprSup->pCtx, pResultRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResultRow, pExprSup->pCtx, pBlock,
pExprSup->rowEntryInfoOffset, pTaskInfo);
code = copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResultRow, pExprSup->pCtx, pBlock,
pExprSup->rowEntryInfoOffset, pTaskInfo);
QUERY_CHECK_CODE(code, lino, _end);
pBlock->info.rows += pResultRow->numOfRows;
clearWinStateBuff(pBuff);
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);

View File

@ -260,8 +260,10 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi
return setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
}
static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex, int32_t endIndex,
const SSDataBlock* pBlock, int64_t* tsList, SExecTaskInfo* pTaskInfo) {
static int32_t doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex,
int32_t endIndex, const SSDataBlock* pBlock, int64_t* tsList,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SWindowRowsSup* pRowSup = &pInfo->winSup;
int32_t numOfOutput = pSup->numOfExprs;
@ -269,15 +271,16 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu
doKeepTuple(pRowSup, tsList[endIndex], pBlock->info.id.groupId);
int32_t ret =
setSingleOutputTupleBufv1(&pInfo->binfo.resultRowInfo, &pRowSup->win, &pInfo->pRow, pSup, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
code = setSingleOutputTupleBufv1(&pInfo->binfo.resultRowInfo, &pRowSup->win, &pInfo->pRow, pSup, &pInfo->aggSup);
if (code != TSDB_CODE_SUCCESS) { // null data, too many state code
qError("failed to set single output tuple buffer, code:%d", code);
return code;
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows,
pBlock->info.rows, numOfOutput);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows,
pBlock->info.rows, numOfOutput);
return code;
}
int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
@ -287,11 +290,11 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = pInfo->binfo.pRes;
int64_t gid = pBlock->info.id.groupId;
SColumnInfoData *ps = NULL, *pe = NULL;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
QUERY_CHECK_NULL(pColInfoData, code, lino, _return, terrno);
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
SWindowRowsSup* pRowSup = &pInfo->winSup;
SColumnInfoData *ps = NULL, *pe = NULL;
int32_t rowIndex = 0;
pRowSup->numOfRows = 0;
@ -333,7 +336,8 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
}
if (rowIndex < pBlock->info.rows) {
doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo);
code = doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo);
QUERY_CHECK_CODE(code, lino, _return);
doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset);
// check buffer size
@ -343,8 +347,9 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
QUERY_CHECK_CODE(code, lino, _return);
}
copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes,
pSup->rowEntryInfoOffset, pTaskInfo);
code = copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes,
pSup->rowEntryInfoOffset, pTaskInfo);
QUERY_CHECK_CODE(code, lino, _return);
pRes->info.rows += pInfo->pRow->numOfRows;
pInfo->pRow->numOfRows = 0;
@ -352,7 +357,8 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
pInfo->inWindow = false;
rowIndex += 1;
} else {
doEventWindowAggImpl(pInfo, pSup, startIndex, pBlock->info.rows - 1, pBlock, tsList, pTaskInfo);
code = doEventWindowAggImpl(pInfo, pSup, startIndex, pBlock->info.rows - 1, pBlock, tsList, pTaskInfo);
QUERY_CHECK_CODE(code, lino, _return);
}
} else { // find the first start value that is fulfill for the start condition
for (; rowIndex < pBlock->info.rows; ++rowIndex) {

View File

@ -666,8 +666,8 @@ void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExpr
}
}
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
for (int32_t j = 0; j < numOfExprs; ++j) {
@ -690,7 +690,7 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR
int32_t winCode = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(winCode)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(winCode));
T_LONG_JMP(pTaskInfo->env, winCode);
QUERY_CHECK_CODE(winCode, lino, _end);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing
@ -710,8 +710,8 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
// todo refactor. SResultRow has direct pointer in miainfo
@ -747,7 +747,12 @@ void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPositi
T_LONG_JMP(pTaskInfo->env, code);
}
copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
code = copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
if (TAOS_FAILED(code)) {
releaseBufPage(pBuf, page);
qError("%s copy result row to datablock failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
@ -818,9 +823,9 @@ void doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SEx
pGroupResInfo->iter = iter;
pGroupResInfo->dataPos = pData;
copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
releaseBufPage(pBuf, page);
QUERY_CHECK_CODE(code, lino, _end);
pBlock->info.rows += pRow->numOfRows;
if (pBlock->info.rows >= threshold) {
break;
@ -892,9 +897,10 @@ void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp
}
pGroupResInfo->index += 1;
copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
releaseBufPage(pBuf, page);
QUERY_CHECK_CODE(code, lino, _end);
pBlock->info.rows += pRow->numOfRows;
if (pBlock->info.rows >= threshold) {
break;

View File

@ -329,8 +329,11 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
int32_t rowIndex = j - num;
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
pOperator->exprSupp.numOfExprs);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
pOperator->exprSupp.numOfExprs);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
// assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
@ -347,8 +350,11 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
int32_t rowIndex = pBlock->info.rows - num;
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
pOperator->exprSupp.numOfExprs);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
pOperator->exprSupp.numOfExprs);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
}
}

View File

@ -1172,8 +1172,9 @@ static int32_t doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
forwardRows, pSDataBlock->info.rows, numOfOutput);
QUERY_CHECK_CODE(code, lino, _end);
key.ts = nextWin.skey;
if (pInfo->delKey.ts > key.ts) {
@ -2442,7 +2443,7 @@ int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo*
QUERY_CHECK_CODE(code, lino, _end);
updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, winDelta);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput);
_end:
if (code != TSDB_CODE_SUCCESS) {
@ -5474,8 +5475,9 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
forwardRows, pSDataBlock->info.rows, numOfOutput);
QUERY_CHECK_CODE(code, lino, _end);
key.ts = nextWin.skey;
if (pInfo->delKey.ts > key.ts) {

View File

@ -653,8 +653,11 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
pBlock->info.rows, numOfExprs);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
pBlock->info.rows, numOfExprs);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pr);
@ -799,8 +802,11 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
doCloseWindow(pResultRowInfo, pInfo, pResult);
@ -838,8 +844,11 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
#endif
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
doCloseWindow(pResultRowInfo, pInfo, pResult);
}
@ -1031,8 +1040,11 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
// here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
@ -1056,8 +1068,11 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
@ -1492,8 +1507,11 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
// here we start a new session window
@ -1511,8 +1529,11 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
@ -1874,8 +1895,11 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
@ -1894,8 +1918,11 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
pBlock->info.rows, pSup->numOfExprs);
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
@ -2223,8 +2250,11 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&win) is closed
@ -2262,8 +2292,11 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
forwardRows, pBlock->info.rows, numOfOutput);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&nextWin) is closed